Repository: predictionio Updated Branches: refs/heads/develop b3fba2eac -> 02377e8dc (forced update)
[PIO-176] Hack in build.sbt for switching between Spark 1.x and 2.x should be cleaned up. Closes #477 Project: http://git-wip-us.apache.org/repos/asf/predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/predictionio/commit/02377e8d Tree: http://git-wip-us.apache.org/repos/asf/predictionio/tree/02377e8d Diff: http://git-wip-us.apache.org/repos/asf/predictionio/diff/02377e8d Branch: refs/heads/develop Commit: 02377e8dc8bb7a564b9c63ff58b4ca7b0b90c3bf Parents: 0193c8b Author: saurabh gulati <saurabh3...@gmail.com> Authored: Wed Oct 3 23:32:27 2018 -0700 Committer: Donald Szeto <don...@apache.org> Committed: Wed Oct 3 23:36:23 2018 -0700 ---------------------------------------------------------------------- build.sbt | 2 - .../data/store/python/PPythonEventStore.scala | 146 +++++++++++++++++++ .../predictionio/data/view/DataView.scala | 9 +- .../data/SparkVersionDependent.scala | 30 ---- .../data/store/python/PPythonEventStore.scala | 146 ------------------- .../data/storage/jdbc/JDBCPEvents.scala | 8 +- .../tools/export/EventsToFile.scala | 6 +- 7 files changed, 153 insertions(+), 194 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/predictionio/blob/02377e8d/build.sbt ---------------------------------------------------------------------- diff --git a/build.sbt b/build.sbt index 9eea3e0..533fbef 100644 --- a/build.sbt +++ b/build.sbt @@ -129,8 +129,6 @@ val data = (project in file("data")). settings(commonSettings: _*). settings(commonTestSettings: _*). enablePlugins(GenJavadocPlugin). - settings(unmanagedSourceDirectories in Compile += - sourceDirectory.value / s"main/spark-${majorVersion(sparkVersion.value)}"). disablePlugins(sbtassembly.AssemblyPlugin) val core = (project in file("core")). http://git-wip-us.apache.org/repos/asf/predictionio/blob/02377e8d/data/src/main/scala/org/apache/predictionio/data/store/python/PPythonEventStore.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/store/python/PPythonEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/python/PPythonEventStore.scala new file mode 100644 index 0000000..1d03634 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/store/python/PPythonEventStore.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.predictionio.data.store.python + +import java.sql.Timestamp + +import org.apache.predictionio.data.store.PEventStore +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.joda.time.DateTime + + +/** This object provides a set of operation to access Event Store + * with Spark's parallelization + */ +object PPythonEventStore { + + + /** Read events from Event Store + * + * @param appName return events of this app + * @param channelName return events of this channel (default channel if it's None) + * @param startTime return events with eventTime >= startTime + * @param untilTime return events with eventTime < untilTime + * @param entityType return events of this entityType + * @param entityId return events of this entityId + * @param eventNames return events with any of these event names. + * @param targetEntityType return events of this targetEntityType: + * - None means no restriction on targetEntityType + * - Some(None) means no targetEntityType for this event + * - Some(Some(x)) means targetEntityType should match x. + * @param targetEntityId return events of this targetEntityId + * - None means no restriction on targetEntityId + * - Some(None) means no targetEntityId for this event + * - Some(Some(x)) means targetEntityId should match x. + * @param spark Spark context + * @return DataFrame + */ + def find( + appName: String, + channelName: String, + startTime: Timestamp, + untilTime: Timestamp, + entityType: String, + entityId: String, + eventNames: Array[String], + targetEntityType: String, + targetEntityId: String + )(spark: SparkSession): DataFrame = { + import spark.implicits._ + val colNames: Seq[String] = + Seq( + "eventId", + "event", + "entityType", + "entityId", + "targetEntityType", + "targetEntityId", + "eventTime", + "tags", + "prId", + "creationTime", + "fields" + ) + PEventStore.find(appName, + Option(channelName), + Option(startTime).map(t => new DateTime(t.getTime)), + Option(untilTime).map(t => new DateTime(t.getTime)), + Option(entityType), + Option(entityId), + Option(eventNames), + Option(Option(targetEntityType)), + Option(Option(targetEntityId)))(spark.sparkContext).map { e => + ( + e.eventId, + e.event, + e.entityType, + e.entityId, + e.targetEntityType.orNull, + e.targetEntityId.orNull, + new Timestamp(e.eventTime.getMillis), + e.tags.mkString("\t"), + e.prId.orNull, + new Timestamp(e.creationTime.getMillis), + e.properties.fields.mapValues(_.values.toString) + ) + }.toDF(colNames: _*) + } + + /** Aggregate properties of entities based on these special events: + * \$set, \$unset, \$delete events. + * + * @param appName use events of this app + * @param entityType aggregate properties of the entities of this entityType + * @param channelName use events of this channel (default channel if it's None) + * @param startTime use events with eventTime >= startTime + * @param untilTime use events with eventTime < untilTime + * @param required only keep entities with these required properties defined + * @param spark Spark session + * @return DataFrame DataFrame of entityId and PropetyMap pair + */ + def aggregateProperties( + appName: String, + entityType: String, + channelName: String, + startTime: Timestamp, + untilTime: Timestamp, + required: Array[String] + ) + (spark: SparkSession): DataFrame = { + import spark.implicits._ + val colNames: Seq[String] = + Seq( + "entityId", + "firstUpdated", + "lastUpdated", + "fields" + ) + PEventStore.aggregateProperties(appName, + entityType, + Option(channelName), + Option(startTime).map(t => new DateTime(t.getTime)), + Option(untilTime).map(t => new DateTime(t.getTime)), + Option(required.toSeq))(spark.sparkContext).map { x => + val m = x._2 + (x._1, + new Timestamp(m.firstUpdated.getMillis), + new Timestamp(m.lastUpdated.getMillis), + m.fields.mapValues(_.values.toString) + ) + }.toDF(colNames: _*) + } +} http://git-wip-us.apache.org/repos/asf/predictionio/blob/02377e8d/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala b/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala index 1c47e10..ca92e8f 100644 --- a/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala +++ b/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala @@ -20,14 +20,10 @@ package org.apache.predictionio.data.view import org.apache.predictionio.annotation.Experimental import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.SparkVersionDependent - import grizzled.slf4j.Logger import org.apache.predictionio.data.store.PEventStore - import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import org.apache.spark.SparkContext import org.joda.time.DateTime @@ -52,7 +48,6 @@ object DataView { * @param name identify the DataFrame created * @param version used to track changes to the conversionFunction, e.g. version = "20150413" * and update whenever the function is changed. - * @param sqlContext SQL context * @tparam E the output type of the conversion function. The type needs to extend Product * (e.g. case class) * @return a DataFrame of events @@ -69,7 +64,7 @@ object DataView { @transient lazy val logger = Logger[this.type] - val sqlSession = SparkVersionDependent.sqlSession(sc) + val sqlSession = SparkSession.builder().getOrCreate() val beginTime = startTime match { case Some(t) => t http://git-wip-us.apache.org/repos/asf/predictionio/blob/02377e8d/data/src/main/spark-2/org/apache/predictionio/data/SparkVersionDependent.scala ---------------------------------------------------------------------- diff --git a/data/src/main/spark-2/org/apache/predictionio/data/SparkVersionDependent.scala b/data/src/main/spark-2/org/apache/predictionio/data/SparkVersionDependent.scala deleted file mode 100644 index 3d07bdf..0000000 --- a/data/src/main/spark-2/org/apache/predictionio/data/SparkVersionDependent.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data - -import org.apache.spark.SparkContext -import org.apache.spark.sql.SparkSession - -object SparkVersionDependent { - - def sqlSession(sc: SparkContext): SparkSession = { - SparkSession.builder().getOrCreate() - } - -} http://git-wip-us.apache.org/repos/asf/predictionio/blob/02377e8d/data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala ---------------------------------------------------------------------- diff --git a/data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala b/data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala deleted file mode 100644 index 1d03634..0000000 --- a/data/src/main/spark-2/org/apache/predictionio/data/store/python/PPythonEventStore.scala +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.predictionio.data.store.python - -import java.sql.Timestamp - -import org.apache.predictionio.data.store.PEventStore -import org.apache.spark.sql.{DataFrame, SparkSession} -import org.joda.time.DateTime - - -/** This object provides a set of operation to access Event Store - * with Spark's parallelization - */ -object PPythonEventStore { - - - /** Read events from Event Store - * - * @param appName return events of this app - * @param channelName return events of this channel (default channel if it's None) - * @param startTime return events with eventTime >= startTime - * @param untilTime return events with eventTime < untilTime - * @param entityType return events of this entityType - * @param entityId return events of this entityId - * @param eventNames return events with any of these event names. - * @param targetEntityType return events of this targetEntityType: - * - None means no restriction on targetEntityType - * - Some(None) means no targetEntityType for this event - * - Some(Some(x)) means targetEntityType should match x. - * @param targetEntityId return events of this targetEntityId - * - None means no restriction on targetEntityId - * - Some(None) means no targetEntityId for this event - * - Some(Some(x)) means targetEntityId should match x. - * @param spark Spark context - * @return DataFrame - */ - def find( - appName: String, - channelName: String, - startTime: Timestamp, - untilTime: Timestamp, - entityType: String, - entityId: String, - eventNames: Array[String], - targetEntityType: String, - targetEntityId: String - )(spark: SparkSession): DataFrame = { - import spark.implicits._ - val colNames: Seq[String] = - Seq( - "eventId", - "event", - "entityType", - "entityId", - "targetEntityType", - "targetEntityId", - "eventTime", - "tags", - "prId", - "creationTime", - "fields" - ) - PEventStore.find(appName, - Option(channelName), - Option(startTime).map(t => new DateTime(t.getTime)), - Option(untilTime).map(t => new DateTime(t.getTime)), - Option(entityType), - Option(entityId), - Option(eventNames), - Option(Option(targetEntityType)), - Option(Option(targetEntityId)))(spark.sparkContext).map { e => - ( - e.eventId, - e.event, - e.entityType, - e.entityId, - e.targetEntityType.orNull, - e.targetEntityId.orNull, - new Timestamp(e.eventTime.getMillis), - e.tags.mkString("\t"), - e.prId.orNull, - new Timestamp(e.creationTime.getMillis), - e.properties.fields.mapValues(_.values.toString) - ) - }.toDF(colNames: _*) - } - - /** Aggregate properties of entities based on these special events: - * \$set, \$unset, \$delete events. - * - * @param appName use events of this app - * @param entityType aggregate properties of the entities of this entityType - * @param channelName use events of this channel (default channel if it's None) - * @param startTime use events with eventTime >= startTime - * @param untilTime use events with eventTime < untilTime - * @param required only keep entities with these required properties defined - * @param spark Spark session - * @return DataFrame DataFrame of entityId and PropetyMap pair - */ - def aggregateProperties( - appName: String, - entityType: String, - channelName: String, - startTime: Timestamp, - untilTime: Timestamp, - required: Array[String] - ) - (spark: SparkSession): DataFrame = { - import spark.implicits._ - val colNames: Seq[String] = - Seq( - "entityId", - "firstUpdated", - "lastUpdated", - "fields" - ) - PEventStore.aggregateProperties(appName, - entityType, - Option(channelName), - Option(startTime).map(t => new DateTime(t.getTime)), - Option(untilTime).map(t => new DateTime(t.getTime)), - Option(required.toSeq))(spark.sparkContext).map { x => - val m = x._2 - (x._1, - new Timestamp(m.firstUpdated.getMillis), - new Timestamp(m.lastUpdated.getMillis), - m.fields.mapValues(_.values.toString) - ) - }.toDF(colNames: _*) - } -} http://git-wip-us.apache.org/repos/asf/predictionio/blob/02377e8d/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala ---------------------------------------------------------------------- diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala index d31e592..4fa8b9f 100644 --- a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala +++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala @@ -20,12 +20,10 @@ package org.apache.predictionio.data.storage.jdbc import java.sql.{DriverManager, ResultSet} import com.github.nscala_time.time.Imports._ -import org.apache.predictionio.data.storage.{ - DataMap, Event, PEvents, StorageClientConfig} -import org.apache.predictionio.data.SparkVersionDependent +import org.apache.predictionio.data.storage.{DataMap, Event, PEvents, StorageClientConfig} import org.apache.spark.SparkContext import org.apache.spark.rdd.{JdbcRDD, RDD} -import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.{SaveMode, SparkSession} import org.json4s.JObject import org.json4s.native.Serialization import scalikejdbc._ @@ -121,7 +119,7 @@ class JDBCPEvents(client: String, config: StorageClientConfig, namespace: String } def write(events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { - val sqlSession = SparkVersionDependent.sqlSession(sc) + val sqlSession = SparkSession.builder().getOrCreate() import sqlSession.implicits._ val tableName = JDBCUtils.eventTableName(namespace, appId, channelId) http://git-wip-us.apache.org/repos/asf/predictionio/blob/02377e8d/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala b/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala index 0372a44..9b6dbb5 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala @@ -21,14 +21,12 @@ package org.apache.predictionio.tools.export import org.apache.predictionio.controller.Utils import org.apache.predictionio.data.storage.EventJson4sSupport import org.apache.predictionio.data.storage.Storage -import org.apache.predictionio.data.SparkVersionDependent import org.apache.predictionio.tools.Runner import org.apache.predictionio.workflow.WorkflowContext import org.apache.predictionio.workflow.WorkflowUtils import org.apache.predictionio.workflow.CleanupFunctions - import grizzled.slf4j.Logging -import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.{SaveMode, SparkSession} import org.json4s.native.Serialization._ case class EventsToFileArgs( @@ -93,7 +91,7 @@ object EventsToFile extends Logging { mode = "Export", batch = "App ID " + args.appId + channelStr, executorEnv = Runner.envStringToMap(args.env)) - val sqlSession = SparkVersionDependent.sqlSession(sc) + val sqlSession = SparkSession.builder().getOrCreate() val events = Storage.getPEvents() val eventsRdd = events.find(appId = args.appId, channelId = channelId)(sc) val jsonStringRdd = eventsRdd.map(write(_))