[PIO-116] PySpark Support Closes #427
Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/df406bf9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/df406bf9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/df406bf9 Branch: refs/heads/livedoc Commit: df406bf92463da4a79c8d84ec0ca439feaa0ec7f Parents: 69c5e3b Author: Shinsuke Sugaya <[email protected]> Authored: Mon Sep 11 12:05:18 2017 +0900 Committer: Shinsuke Sugaya <[email protected]> Committed: Mon Sep 11 12:05:18 2017 +0900 ---------------------------------------------------------------------- bin/pio-shell | 9 ++ .../data/store/python/PPythonEventStore.scala | 146 +++++++++++++++++++ make-distribution.sh | 2 + python/pypio/__init__.py | 20 +++ python/pypio/data/__init__.py | 25 ++++ python/pypio/data/eventstore.py | 50 +++++++ python/pypio/shell.py | 21 +++ python/pypio/utils.py | 27 ++++ .../apache/predictionio/tools/RunWorkflow.scala | 7 +- .../org/apache/predictionio/tools/Runner.scala | 47 ++++-- .../predictionio/tools/console/Console.scala | 3 + 11 files changed, 346 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/bin/pio-shell ---------------------------------------------------------------------- diff --git a/bin/pio-shell b/bin/pio-shell index 5a5745a..cd119cd 100755 --- a/bin/pio-shell +++ b/bin/pio-shell @@ -59,6 +59,15 @@ then . ${PIO_HOME}/bin/compute-classpath.sh shift ${SPARK_HOME}/bin/spark-shell --jars ${ASSEMBLY_JARS} $@ +elif [[ "$1" == "--with-pyspark" ]] +then + echo "Starting the PIO shell with the Apache Spark Shell." + # Get paths of assembly jars to pass to pyspark + . ${PIO_HOME}/bin/compute-classpath.sh + shift + export PYTHONSTARTUP=${PIO_HOME}/python/pypio/shell.py + export PYTHONPATH=${PIO_HOME}/python + ${SPARK_HOME}/bin/pyspark --jars ${ASSEMBLY_JARS} $@ else echo -e "\033[0;33mStarting the PIO shell without Apache Spark.\033[0m" echo -e "\033[0;33mIf you need the Apache Spark library, run 'pio-shell --with-spark [spark-submit arguments...]'.\033[0m" http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala ---------------------------------------------------------------------- diff --git a/data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala b/data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala new file mode 100644 index 0000000..1d03634 --- /dev/null +++ b/data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.predictionio.data.store.python + +import java.sql.Timestamp + +import org.apache.predictionio.data.store.PEventStore +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.joda.time.DateTime + + +/** This object provides a set of operation to access Event Store + * with Spark's parallelization + */ +object PPythonEventStore { + + + /** Read events from Event Store + * + * @param appName return events of this app + * @param channelName return events of this channel (default channel if it's None) + * @param startTime return events with eventTime >= startTime + * @param untilTime return events with eventTime < untilTime + * @param entityType return events of this entityType + * @param entityId return events of this entityId + * @param eventNames return events with any of these event names. + * @param targetEntityType return events of this targetEntityType: + * - None means no restriction on targetEntityType + * - Some(None) means no targetEntityType for this event + * - Some(Some(x)) means targetEntityType should match x. + * @param targetEntityId return events of this targetEntityId + * - None means no restriction on targetEntityId + * - Some(None) means no targetEntityId for this event + * - Some(Some(x)) means targetEntityId should match x. + * @param spark Spark context + * @return DataFrame + */ + def find( + appName: String, + channelName: String, + startTime: Timestamp, + untilTime: Timestamp, + entityType: String, + entityId: String, + eventNames: Array[String], + targetEntityType: String, + targetEntityId: String + )(spark: SparkSession): DataFrame = { + import spark.implicits._ + val colNames: Seq[String] = + Seq( + "eventId", + "event", + "entityType", + "entityId", + "targetEntityType", + "targetEntityId", + "eventTime", + "tags", + "prId", + "creationTime", + "fields" + ) + PEventStore.find(appName, + Option(channelName), + Option(startTime).map(t => new DateTime(t.getTime)), + Option(untilTime).map(t => new DateTime(t.getTime)), + Option(entityType), + Option(entityId), + Option(eventNames), + Option(Option(targetEntityType)), + Option(Option(targetEntityId)))(spark.sparkContext).map { e => + ( + e.eventId, + e.event, + e.entityType, + e.entityId, + e.targetEntityType.orNull, + e.targetEntityId.orNull, + new Timestamp(e.eventTime.getMillis), + e.tags.mkString("\t"), + e.prId.orNull, + new Timestamp(e.creationTime.getMillis), + e.properties.fields.mapValues(_.values.toString) + ) + }.toDF(colNames: _*) + } + + /** Aggregate properties of entities based on these special events: + * \$set, \$unset, \$delete events. + * + * @param appName use events of this app + * @param entityType aggregate properties of the entities of this entityType + * @param channelName use events of this channel (default channel if it's None) + * @param startTime use events with eventTime >= startTime + * @param untilTime use events with eventTime < untilTime + * @param required only keep entities with these required properties defined + * @param spark Spark session + * @return DataFrame DataFrame of entityId and PropetyMap pair + */ + def aggregateProperties( + appName: String, + entityType: String, + channelName: String, + startTime: Timestamp, + untilTime: Timestamp, + required: Array[String] + ) + (spark: SparkSession): DataFrame = { + import spark.implicits._ + val colNames: Seq[String] = + Seq( + "entityId", + "firstUpdated", + "lastUpdated", + "fields" + ) + PEventStore.aggregateProperties(appName, + entityType, + Option(channelName), + Option(startTime).map(t => new DateTime(t.getTime)), + Option(untilTime).map(t => new DateTime(t.getTime)), + Option(required.toSeq))(spark.sparkContext).map { x => + val m = x._2 + (x._1, + new Timestamp(m.firstUpdated.getMillis), + new Timestamp(m.lastUpdated.getMillis), + m.fields.mapValues(_.values.toString) + ) + }.toDF(colNames: _*) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/make-distribution.sh ---------------------------------------------------------------------- diff --git a/make-distribution.sh b/make-distribution.sh index bf4c7ce..7a34274 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -83,6 +83,7 @@ cd ${FWDIR} rm -rf ${DISTDIR} mkdir -p ${DISTDIR}/bin mkdir -p ${DISTDIR}/conf +mkdir -p ${DISTDIR}/python mkdir -p ${DISTDIR}/lib mkdir -p ${DISTDIR}/lib/spark mkdir -p ${DISTDIR}/project @@ -91,6 +92,7 @@ mkdir -p ${DISTDIR}/sbt cp ${FWDIR}/bin/* ${DISTDIR}/bin || : cp ${FWDIR}/conf/* ${DISTDIR}/conf +cp -r ${FWDIR}/python/* ${DISTDIR}/python cp ${FWDIR}/project/build.properties ${DISTDIR}/project cp ${FWDIR}/sbt/sbt ${DISTDIR}/sbt cp ${FWDIR}/assembly/src/universal/lib/*assembly*jar ${DISTDIR}/lib http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/python/pypio/__init__.py ---------------------------------------------------------------------- diff --git a/python/pypio/__init__.py b/python/pypio/__init__.py new file mode 100644 index 0000000..04d8ac3 --- /dev/null +++ b/python/pypio/__init__.py @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +PyPIO is the Python API for PredictionIO. +""" http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/python/pypio/data/__init__.py ---------------------------------------------------------------------- diff --git a/python/pypio/data/__init__.py b/python/pypio/data/__init__.py new file mode 100644 index 0000000..63a6442 --- /dev/null +++ b/python/pypio/data/__init__.py @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import + +from pypio.data.eventstore import PEventStore + + +__all__ = [ + 'PEventStore' +] http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/python/pypio/data/eventstore.py ---------------------------------------------------------------------- diff --git a/python/pypio/data/eventstore.py b/python/pypio/data/eventstore.py new file mode 100644 index 0000000..4eb73df --- /dev/null +++ b/python/pypio/data/eventstore.py @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import + +from pypio.utils import new_string_array +from pyspark.sql.dataframe import DataFrame + +__all__ = ["PEventStore"] + + +class PEventStore(object): + + def __init__(self, jss, sql_ctx): + self._jss = jss + self.sql_ctx = sql_ctx + self._sc = sql_ctx and sql_ctx._sc + + def find(self, app_name, channel_name=None, start_time=None, until_time=None, + entity_type=None, entity_id=None, event_names=None, target_entity_type=None, + target_entity_id=None): + pes = self._sc._jvm.org.apache.predictionio.data.store.python.PPythonEventStore + jdf = pes.find(app_name, channel_name, start_time, until_time, entity_type, entity_id, + event_names, target_entity_type, target_entity_id, self._jss) + return DataFrame(jdf, self.sql_ctx) + + def aggregate_properties(self, app_name, entity_type, channel_name=None, + start_time=None, until_time=None, required=None): + pes = self._sc._jvm.org.apache.predictionio.data.store.python.PPythonEventStore + jdf = pes.aggregateProperties(app_name, entity_type, channel_name, + start_time, until_time, + new_string_array(required, self._sc._gateway), + self._jss) + return DataFrame(jdf, self.sql_ctx) + + http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/python/pypio/shell.py ---------------------------------------------------------------------- diff --git a/python/pypio/shell.py b/python/pypio/shell.py new file mode 100644 index 0000000..94c1e1a --- /dev/null +++ b/python/pypio/shell.py @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pypio.data import PEventStore + +p_event_store = PEventStore(spark._jsparkSession, sqlContext) + http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/python/pypio/utils.py ---------------------------------------------------------------------- diff --git a/python/pypio/utils.py b/python/pypio/utils.py new file mode 100644 index 0000000..76900c3 --- /dev/null +++ b/python/pypio/utils.py @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +def new_string_array(list_data, gateway): + if list_data is None: + return None + string_class = gateway.jvm.String + args = gateway.new_array(string_class, len(list_data)) + for i in range(len(list_data)): + args[i] = list_data[i] + return args + http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala index a25f4e0..236d3ba 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala @@ -39,6 +39,7 @@ case class WorkflowArgs( stopAfterRead: Boolean = false, stopAfterPrepare: Boolean = false, skipSanityCheck: Boolean = false, + mainPyFile: Option[String] = None, jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both) object RunWorkflow extends Logging { @@ -85,8 +86,12 @@ object RunWorkflow extends Logging { (if (wa.batch != "") Seq("--batch", wa.batch) else Nil) ++ Seq("--json-extractor", wa.jsonExtractor.toString) + val resourceName = wa.mainPyFile match { + case Some(x) => x + case _ => "org.apache.predictionio.workflow.CreateWorkflow" + } Runner.runOnSpark( - "org.apache.predictionio.workflow.CreateWorkflow", + resourceName, args, sa, jarFiles, http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala b/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala index 4e266c8..8e08b21 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala @@ -183,7 +183,7 @@ object Runner extends EitherLogging { } def runOnSpark( - className: String, + resourceName: String, classArgs: Seq[String], sa: SparkArgs, extraJars: Seq[URI], @@ -194,6 +194,10 @@ object Runner extends EitherLogging { argumentValue(sa.sparkPassThrough, "--deploy-mode").getOrElse("client") val master = argumentValue(sa.sparkPassThrough, "--master").getOrElse("local") + val isPython = resourceName match { + case x if x.endsWith(".py") => true + case _ => false + } (sa.scratchUri, deployMode, master) match { case (Some(u), "client", m) if m != "yarn-cluster" => @@ -219,10 +223,15 @@ object Runner extends EitherLogging { sys.env.getOrElse("SPARK_HOME", ".")) // Local path to PredictionIO assembly JAR - val mainJar = Common.coreAssembly(pioHome) fold( - errStr => return Left(errStr), - assembly => handleScratchFile(fs, sa.scratchUri, assembly) - ) + val assemblyJar = Common.coreAssembly(pioHome) fold( + errStr => return Left(errStr), + assembly => handleScratchFile(fs, sa.scratchUri, assembly) + ) + val mainJar = if(isPython) { + resourceName + } else { + assemblyJar + } // Extra JARs that are needed by the driver val driverClassPathPrefix = @@ -247,8 +256,13 @@ object Runner extends EitherLogging { val sparkSubmitCommand = Seq(Seq(sparkHome, "bin", "spark-submit").mkString(File.separator)) - val sparkSubmitJarsList = WorkflowUtils.thirdPartyJars ++ deployedJars ++ - Common.jarFilesForSpark(pioHome).map(_.toURI) + val sparkSubmitJarsList = if(isPython) { + WorkflowUtils.thirdPartyJars ++ deployedJars ++ + Common.jarFilesForSpark(pioHome).map(_.toURI) ++ Seq(new URI(assemblyJar)) + } else { + WorkflowUtils.thirdPartyJars ++ deployedJars ++ + Common.jarFilesForSpark(pioHome).map(_.toURI) + } val sparkSubmitJars = if (sparkSubmitJarsList.nonEmpty) { Seq("--jars", sparkSubmitJarsList.map(_.toString).mkString(",")) } else { @@ -275,12 +289,18 @@ object Runner extends EitherLogging { Nil } + val className = if(isPython) { + Nil + } else { + Seq("--class", resourceName) + } + val verboseArg = if (verbose) Seq("--verbose") else Nil val pioLogDir = Option(System.getProperty("pio.log.dir")).getOrElse(s"$pioHome/log") val sparkSubmitArgs = Seq( sa.sparkPassThrough, - Seq("--class", className), + className, sparkSubmitJars, sparkSubmitFiles, sparkSubmitExtraClasspaths, @@ -298,11 +318,18 @@ object Runner extends EitherLogging { Seq("--env", pioEnvVars), verboseArg).flatten.filter(_ != "") info(s"Submission command: ${sparkSubmit.mkString(" ")}") + val extraEnv: Seq[(String, String)] = if(isPython) { + Seq("CLASSPATH" -> "", + "SPARK_YARN_USER_ENV" -> pioEnvVars, + "PYTHONPATH" -> s"$pioHome/python") + } else { + Seq("CLASSPATH" -> "", + "SPARK_YARN_USER_ENV" -> pioEnvVars) + } val proc = Process( sparkSubmit, None, - "CLASSPATH" -> "", - "SPARK_YARN_USER_ENV" -> pioEnvVars).run() + extraEnv:_*).run() Right((proc, () => cleanup(fs, sa.scratchUri))) } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/df406bf9/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala index acd7598..04df82f 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala @@ -214,6 +214,9 @@ object Console extends Logging { opt[String]("engine-params-key") action { (x, c) => c.copy(workflow = c.workflow.copy(engineParamsKey = Some(x))) }, + opt[String]("main-py-file") action { (x, c) => + c.copy(workflow = c.workflow.copy(mainPyFile = Some(x))) + }, opt[String]("json-extractor") action { (x, c) => c.copy(workflow = c.workflow.copy(jsonExtractor = JsonExtractorOption.withName(x))) } validate { x =>
