[PIO-116] PySpark Support

Closes #427


Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/df406bf9
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/df406bf9
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/df406bf9

Branch: refs/heads/livedoc
Commit: df406bf92463da4a79c8d84ec0ca439feaa0ec7f
Parents: 69c5e3b
Author: Shinsuke Sugaya <[email protected]>
Authored: Mon Sep 11 12:05:18 2017 +0900
Committer: Shinsuke Sugaya <[email protected]>
Committed: Mon Sep 11 12:05:18 2017 +0900

----------------------------------------------------------------------
 bin/pio-shell                                   |   9 ++
 .../data/store/python/PPythonEventStore.scala   | 146 +++++++++++++++++++
 make-distribution.sh                            |   2 +
 python/pypio/__init__.py                        |  20 +++
 python/pypio/data/__init__.py                   |  25 ++++
 python/pypio/data/eventstore.py                 |  50 +++++++
 python/pypio/shell.py                           |  21 +++
 python/pypio/utils.py                           |  27 ++++
 .../apache/predictionio/tools/RunWorkflow.scala |   7 +-
 .../org/apache/predictionio/tools/Runner.scala  |  47 ++++--
 .../predictionio/tools/console/Console.scala    |   3 +
 11 files changed, 346 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/bin/pio-shell
----------------------------------------------------------------------
diff --git a/bin/pio-shell b/bin/pio-shell
index 5a5745a..cd119cd 100755
--- a/bin/pio-shell
+++ b/bin/pio-shell
@@ -59,6 +59,15 @@ then
   . ${PIO_HOME}/bin/compute-classpath.sh
   shift
   ${SPARK_HOME}/bin/spark-shell --jars ${ASSEMBLY_JARS} $@
+elif [[ "$1" == "--with-pyspark" ]]
+then
+  echo "Starting the PIO shell with the Apache Spark Shell."
+  # Get paths of assembly jars to pass to pyspark
+  . ${PIO_HOME}/bin/compute-classpath.sh
+  shift
+  export PYTHONSTARTUP=${PIO_HOME}/python/pypio/shell.py
+  export PYTHONPATH=${PIO_HOME}/python
+  ${SPARK_HOME}/bin/pyspark --jars ${ASSEMBLY_JARS} $@
 else
   echo -e "\033[0;33mStarting the PIO shell without Apache Spark.\033[0m"
   echo -e "\033[0;33mIf you need the Apache Spark library, run 'pio-shell 
--with-spark [spark-submit arguments...]'.\033[0m"

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala
 
b/data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala
new file mode 100644
index 0000000..1d03634
--- /dev/null
+++ 
b/data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.predictionio.data.store.python
+
+import java.sql.Timestamp
+
+import org.apache.predictionio.data.store.PEventStore
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.joda.time.DateTime
+
+
+/** This object provides a set of operation to access Event Store
+  * with Spark's parallelization
+  */
+object PPythonEventStore {
+
+
+  /** Read events from Event Store
+    *
+    * @param appName          return events of this app
+    * @param channelName      return events of this channel (default channel 
if it's None)
+    * @param startTime        return events with eventTime >= startTime
+    * @param untilTime        return events with eventTime < untilTime
+    * @param entityType       return events of this entityType
+    * @param entityId         return events of this entityId
+    * @param eventNames       return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId   return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param spark            Spark context
+    * @return DataFrame
+    */
+  def find(
+            appName: String,
+            channelName: String,
+            startTime: Timestamp,
+            untilTime: Timestamp,
+            entityType: String,
+            entityId: String,
+            eventNames: Array[String],
+            targetEntityType: String,
+            targetEntityId: String
+          )(spark: SparkSession): DataFrame = {
+    import spark.implicits._
+    val colNames: Seq[String] =
+      Seq(
+        "eventId",
+        "event",
+        "entityType",
+        "entityId",
+        "targetEntityType",
+        "targetEntityId",
+        "eventTime",
+        "tags",
+        "prId",
+        "creationTime",
+        "fields"
+      )
+    PEventStore.find(appName,
+      Option(channelName),
+      Option(startTime).map(t => new DateTime(t.getTime)),
+      Option(untilTime).map(t => new DateTime(t.getTime)),
+      Option(entityType),
+      Option(entityId),
+      Option(eventNames),
+      Option(Option(targetEntityType)),
+      Option(Option(targetEntityId)))(spark.sparkContext).map { e =>
+      (
+        e.eventId,
+        e.event,
+        e.entityType,
+        e.entityId,
+        e.targetEntityType.orNull,
+        e.targetEntityId.orNull,
+        new Timestamp(e.eventTime.getMillis),
+        e.tags.mkString("\t"),
+        e.prId.orNull,
+        new Timestamp(e.creationTime.getMillis),
+        e.properties.fields.mapValues(_.values.toString)
+      )
+    }.toDF(colNames: _*)
+  }
+
+  /** Aggregate properties of entities based on these special events:
+    * \$set, \$unset, \$delete events.
+    *
+    * @param appName     use events of this app
+    * @param entityType  aggregate properties of the entities of this 
entityType
+    * @param channelName use events of this channel (default channel if it's 
None)
+    * @param startTime   use events with eventTime >= startTime
+    * @param untilTime   use events with eventTime < untilTime
+    * @param required    only keep entities with these required properties 
defined
+    * @param spark       Spark session
+    * @return DataFrame  DataFrame of entityId and PropetyMap pair
+    */
+  def aggregateProperties(
+                           appName: String,
+                           entityType: String,
+                           channelName: String,
+                           startTime: Timestamp,
+                           untilTime: Timestamp,
+                           required: Array[String]
+                         )
+                         (spark: SparkSession): DataFrame = {
+    import spark.implicits._
+    val colNames: Seq[String] =
+      Seq(
+        "entityId",
+        "firstUpdated",
+        "lastUpdated",
+        "fields"
+      )
+    PEventStore.aggregateProperties(appName,
+      entityType,
+      Option(channelName),
+      Option(startTime).map(t => new DateTime(t.getTime)),
+      Option(untilTime).map(t => new DateTime(t.getTime)),
+      Option(required.toSeq))(spark.sparkContext).map { x =>
+      val m = x._2
+      (x._1,
+        new Timestamp(m.firstUpdated.getMillis),
+        new Timestamp(m.lastUpdated.getMillis),
+        m.fields.mapValues(_.values.toString)
+      )
+    }.toDF(colNames: _*)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/make-distribution.sh
----------------------------------------------------------------------
diff --git a/make-distribution.sh b/make-distribution.sh
index bf4c7ce..7a34274 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -83,6 +83,7 @@ cd ${FWDIR}
 rm -rf ${DISTDIR}
 mkdir -p ${DISTDIR}/bin
 mkdir -p ${DISTDIR}/conf
+mkdir -p ${DISTDIR}/python
 mkdir -p ${DISTDIR}/lib
 mkdir -p ${DISTDIR}/lib/spark
 mkdir -p ${DISTDIR}/project
@@ -91,6 +92,7 @@ mkdir -p ${DISTDIR}/sbt
 
 cp ${FWDIR}/bin/* ${DISTDIR}/bin || :
 cp ${FWDIR}/conf/* ${DISTDIR}/conf
+cp -r ${FWDIR}/python/* ${DISTDIR}/python
 cp ${FWDIR}/project/build.properties ${DISTDIR}/project
 cp ${FWDIR}/sbt/sbt ${DISTDIR}/sbt
 cp ${FWDIR}/assembly/src/universal/lib/*assembly*jar ${DISTDIR}/lib

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/python/pypio/__init__.py
----------------------------------------------------------------------
diff --git a/python/pypio/__init__.py b/python/pypio/__init__.py
new file mode 100644
index 0000000..04d8ac3
--- /dev/null
+++ b/python/pypio/__init__.py
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+PyPIO is the Python API for PredictionIO.
+"""

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/python/pypio/data/__init__.py
----------------------------------------------------------------------
diff --git a/python/pypio/data/__init__.py b/python/pypio/data/__init__.py
new file mode 100644
index 0000000..63a6442
--- /dev/null
+++ b/python/pypio/data/__init__.py
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+from pypio.data.eventstore import PEventStore
+
+
+__all__ = [
+    'PEventStore'
+]

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/python/pypio/data/eventstore.py
----------------------------------------------------------------------
diff --git a/python/pypio/data/eventstore.py b/python/pypio/data/eventstore.py
new file mode 100644
index 0000000..4eb73df
--- /dev/null
+++ b/python/pypio/data/eventstore.py
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+from pypio.utils import new_string_array
+from pyspark.sql.dataframe import DataFrame
+
+__all__ = ["PEventStore"]
+
+
+class PEventStore(object):
+
+    def __init__(self, jss, sql_ctx):
+        self._jss = jss
+        self.sql_ctx = sql_ctx
+        self._sc = sql_ctx and sql_ctx._sc
+
+    def find(self, app_name, channel_name=None, start_time=None, 
until_time=None,
+             entity_type=None, entity_id=None, event_names=None, 
target_entity_type=None,
+             target_entity_id=None):
+        pes = 
self._sc._jvm.org.apache.predictionio.data.store.python.PPythonEventStore
+        jdf = pes.find(app_name, channel_name, start_time, until_time, 
entity_type, entity_id,
+                       event_names, target_entity_type, target_entity_id, 
self._jss)
+        return DataFrame(jdf, self.sql_ctx)
+
+    def aggregate_properties(self, app_name, entity_type, channel_name=None,
+                             start_time=None, until_time=None, required=None):
+        pes = 
self._sc._jvm.org.apache.predictionio.data.store.python.PPythonEventStore
+        jdf = pes.aggregateProperties(app_name, entity_type, channel_name,
+                                      start_time, until_time,
+                                      new_string_array(required, 
self._sc._gateway),
+                                      self._jss)
+        return DataFrame(jdf, self.sql_ctx)
+
+

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/python/pypio/shell.py
----------------------------------------------------------------------
diff --git a/python/pypio/shell.py b/python/pypio/shell.py
new file mode 100644
index 0000000..94c1e1a
--- /dev/null
+++ b/python/pypio/shell.py
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pypio.data import PEventStore
+
+p_event_store = PEventStore(spark._jsparkSession, sqlContext)
+

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/python/pypio/utils.py
----------------------------------------------------------------------
diff --git a/python/pypio/utils.py b/python/pypio/utils.py
new file mode 100644
index 0000000..76900c3
--- /dev/null
+++ b/python/pypio/utils.py
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+def new_string_array(list_data, gateway):
+    if list_data is None:
+        return None
+    string_class = gateway.jvm.String
+    args = gateway.new_array(string_class, len(list_data))
+    for i in range(len(list_data)):
+        args[i] = list_data[i]
+    return args
+

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
----------------------------------------------------------------------
diff --git 
a/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala 
b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
index a25f4e0..236d3ba 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
@@ -39,6 +39,7 @@ case class WorkflowArgs(
   stopAfterRead: Boolean = false,
   stopAfterPrepare: Boolean = false,
   skipSanityCheck: Boolean = false,
+  mainPyFile: Option[String] = None,
   jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both)
 
 object RunWorkflow extends Logging {
@@ -85,8 +86,12 @@ object RunWorkflow extends Logging {
       (if (wa.batch != "") Seq("--batch", wa.batch) else Nil) ++
       Seq("--json-extractor", wa.jsonExtractor.toString)
 
+    val resourceName = wa.mainPyFile match {
+      case Some(x) => x
+      case _ => "org.apache.predictionio.workflow.CreateWorkflow"
+    }
     Runner.runOnSpark(
-      "org.apache.predictionio.workflow.CreateWorkflow",
+      resourceName,
       args,
       sa,
       jarFiles,

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala 
b/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala
index 4e266c8..8e08b21 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala
@@ -183,7 +183,7 @@ object Runner extends EitherLogging {
   }
 
   def runOnSpark(
-      className: String,
+      resourceName: String,
       classArgs: Seq[String],
       sa: SparkArgs,
       extraJars: Seq[URI],
@@ -194,6 +194,10 @@ object Runner extends EitherLogging {
       argumentValue(sa.sparkPassThrough, "--deploy-mode").getOrElse("client")
     val master =
       argumentValue(sa.sparkPassThrough, "--master").getOrElse("local")
+    val isPython = resourceName match {
+      case x if x.endsWith(".py") => true
+      case _ => false
+    }
 
     (sa.scratchUri, deployMode, master) match {
       case (Some(u), "client", m) if m != "yarn-cluster" =>
@@ -219,10 +223,15 @@ object Runner extends EitherLogging {
       sys.env.getOrElse("SPARK_HOME", "."))
 
     // Local path to PredictionIO assembly JAR
-    val mainJar = Common.coreAssembly(pioHome) fold(
-        errStr => return Left(errStr),
-        assembly => handleScratchFile(fs, sa.scratchUri, assembly)
-      )
+    val assemblyJar = Common.coreAssembly(pioHome) fold(
+      errStr => return Left(errStr),
+      assembly => handleScratchFile(fs, sa.scratchUri, assembly)
+    )
+    val mainJar = if(isPython) {
+      resourceName
+    } else {
+      assemblyJar
+    }
 
     // Extra JARs that are needed by the driver
     val driverClassPathPrefix =
@@ -247,8 +256,13 @@ object Runner extends EitherLogging {
     val sparkSubmitCommand =
       Seq(Seq(sparkHome, "bin", "spark-submit").mkString(File.separator))
 
-    val sparkSubmitJarsList = WorkflowUtils.thirdPartyJars ++ deployedJars ++
-      Common.jarFilesForSpark(pioHome).map(_.toURI)
+    val sparkSubmitJarsList = if(isPython) {
+      WorkflowUtils.thirdPartyJars ++ deployedJars ++
+        Common.jarFilesForSpark(pioHome).map(_.toURI) ++ Seq(new 
URI(assemblyJar))
+    } else {
+       WorkflowUtils.thirdPartyJars ++ deployedJars ++
+        Common.jarFilesForSpark(pioHome).map(_.toURI)
+    }
     val sparkSubmitJars = if (sparkSubmitJarsList.nonEmpty) {
       Seq("--jars", sparkSubmitJarsList.map(_.toString).mkString(","))
     } else {
@@ -275,12 +289,18 @@ object Runner extends EitherLogging {
       Nil
     }
 
+    val className = if(isPython) {
+      Nil
+    } else {
+      Seq("--class", resourceName)
+    }
+
     val verboseArg = if (verbose) Seq("--verbose") else Nil
     val pioLogDir = 
Option(System.getProperty("pio.log.dir")).getOrElse(s"$pioHome/log")
 
     val sparkSubmitArgs = Seq(
       sa.sparkPassThrough,
-      Seq("--class", className),
+      className,
       sparkSubmitJars,
       sparkSubmitFiles,
       sparkSubmitExtraClasspaths,
@@ -298,11 +318,18 @@ object Runner extends EitherLogging {
       Seq("--env", pioEnvVars),
       verboseArg).flatten.filter(_ != "")
     info(s"Submission command: ${sparkSubmit.mkString(" ")}")
+    val extraEnv: Seq[(String, String)] = if(isPython) {
+      Seq("CLASSPATH" -> "",
+        "SPARK_YARN_USER_ENV" -> pioEnvVars,
+        "PYTHONPATH" -> s"$pioHome/python")
+    } else {
+      Seq("CLASSPATH" -> "",
+        "SPARK_YARN_USER_ENV" -> pioEnvVars)
+    }
     val proc = Process(
       sparkSubmit,
       None,
-      "CLASSPATH" -> "",
-      "SPARK_YARN_USER_ENV" -> pioEnvVars).run()
+      extraEnv:_*).run()
     Right((proc, () => cleanup(fs, sa.scratchUri)))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
----------------------------------------------------------------------
diff --git 
a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala 
b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
index acd7598..04df82f 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala
@@ -214,6 +214,9 @@ object Console extends Logging {
           opt[String]("engine-params-key") action { (x, c) =>
             c.copy(workflow = c.workflow.copy(engineParamsKey = Some(x)))
           },
+          opt[String]("main-py-file") action { (x, c) =>
+            c.copy(workflow = c.workflow.copy(mainPyFile = Some(x)))
+          },
           opt[String]("json-extractor") action { (x, c) =>
             c.copy(workflow = c.workflow.copy(jsonExtractor = 
JsonExtractorOption.withName(x)))
           } validate { x =>

Reply via email to