http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala new file mode 100644 index 0000000..52c8b44 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala @@ -0,0 +1,86 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.jdbc + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.App +import org.apache.predictionio.data.storage.Apps +import org.apache.predictionio.data.storage.StorageClientConfig +import scalikejdbc._ + +/** JDBC implementation of [[Apps]] */ +class JDBCApps(client: String, config: StorageClientConfig, prefix: String) + extends Apps with Logging { + /** Database table name for this data access object */ + val tableName = JDBCUtils.prefixTableName(prefix, "apps") + DB autoCommit { implicit session => + sql""" + create table if not exists $tableName ( + id serial not null primary key, + name text not null, + description text)""".execute.apply() + } + + def insert(app: App): Option[Int] = DB localTx { implicit session => + val q = if (app.id == 0) { + sql""" + insert into $tableName (name, description) values(${app.name}, ${app.description}) + """ + } else { + sql""" + insert into $tableName values(${app.id}, ${app.name}, ${app.description}) + """ + } + Some(q.updateAndReturnGeneratedKey().apply().toInt) + } + + def get(id: Int): Option[App] = DB readOnly { implicit session => + sql"SELECT id, name, description FROM $tableName WHERE id = ${id}".map(rs => + App( + id = rs.int("id"), + name = rs.string("name"), + description = rs.stringOpt("description")) + ).single().apply() + } + + def getByName(name: String): Option[App] = DB readOnly { implicit session => + sql"SELECT id, name, description FROM $tableName WHERE name = ${name}".map(rs => + App( + id = rs.int("id"), + name = rs.string("name"), + description = rs.stringOpt("description")) + ).single().apply() + } + + def getAll(): Seq[App] = DB readOnly { implicit session => + sql"SELECT id, name, description FROM $tableName".map(rs => + App( + id = rs.int("id"), + name = rs.string("name"), + description = rs.stringOpt("description")) + ).list().apply() + } + + def update(app: App): Unit = DB localTx { implicit session => + sql""" + update $tableName set name = ${app.name}, description = ${app.description} + where id = ${app.id}""".update().apply() + } + + def delete(id: Int): Unit = DB localTx { implicit session => + sql"DELETE FROM $tableName WHERE id = $id".update().apply() + } +}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala new file mode 100644 index 0000000..f94a64a --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala @@ -0,0 +1,66 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.jdbc + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.Channel +import org.apache.predictionio.data.storage.Channels +import org.apache.predictionio.data.storage.StorageClientConfig +import scalikejdbc._ + +/** JDBC implementation of [[Channels]] */ +class JDBCChannels(client: String, config: StorageClientConfig, prefix: String) + extends Channels with Logging { + /** Database table name for this data access object */ + val tableName = JDBCUtils.prefixTableName(prefix, "channels") + DB autoCommit { implicit session => + sql""" + create table if not exists $tableName ( + id serial not null primary key, + name text not null, + appid integer not null)""".execute().apply() + } + + def insert(channel: Channel): Option[Int] = DB localTx { implicit session => + val q = if (channel.id == 0) { + sql"INSERT INTO $tableName (name, appid) VALUES(${channel.name}, ${channel.appid})" + } else { + sql"INSERT INTO $tableName VALUES(${channel.id}, ${channel.name}, ${channel.appid})" + } + Some(q.updateAndReturnGeneratedKey().apply().toInt) + } + + def get(id: Int): Option[Channel] = DB localTx { implicit session => + sql"SELECT id, name, appid FROM $tableName WHERE id = $id". + map(resultToChannel).single().apply() + } + + def getByAppid(appid: Int): Seq[Channel] = DB localTx { implicit session => + sql"SELECT id, name, appid FROM $tableName WHERE appid = $appid". + map(resultToChannel).list().apply() + } + + def delete(id: Int): Unit = DB localTx { implicit session => + sql"DELETE FROM $tableName WHERE id = $id".update().apply() + } + + def resultToChannel(rs: WrappedResultSet): Channel = { + Channel( + id = rs.int("id"), + name = rs.string("name"), + appid = rs.int("appid")) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala new file mode 100644 index 0000000..a4bd640 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala @@ -0,0 +1,194 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.jdbc + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.EngineInstance +import org.apache.predictionio.data.storage.EngineInstances +import org.apache.predictionio.data.storage.StorageClientConfig +import scalikejdbc._ + +/** JDBC implementation of [[EngineInstances]] */ +class JDBCEngineInstances(client: String, config: StorageClientConfig, prefix: String) + extends EngineInstances with Logging { + /** Database table name for this data access object */ + val tableName = JDBCUtils.prefixTableName(prefix, "engineinstances") + DB autoCommit { implicit session => + sql""" + create table if not exists $tableName ( + id varchar(100) not null primary key, + status text not null, + startTime timestamp DEFAULT CURRENT_TIMESTAMP, + endTime timestamp DEFAULT CURRENT_TIMESTAMP, + engineId text not null, + engineVersion text not null, + engineVariant text not null, + engineFactory text not null, + batch text not null, + env text not null, + sparkConf text not null, + datasourceParams text not null, + preparatorParams text not null, + algorithmsParams text not null, + servingParams text not null)""".execute().apply() + } + + def insert(i: EngineInstance): String = DB localTx { implicit session => + val id = java.util.UUID.randomUUID().toString + sql""" + INSERT INTO $tableName VALUES( + $id, + ${i.status}, + ${i.startTime}, + ${i.endTime}, + ${i.engineId}, + ${i.engineVersion}, + ${i.engineVariant}, + ${i.engineFactory}, + ${i.batch}, + ${JDBCUtils.mapToString(i.env)}, + ${JDBCUtils.mapToString(i.sparkConf)}, + ${i.dataSourceParams}, + ${i.preparatorParams}, + ${i.algorithmsParams}, + ${i.servingParams})""".update().apply() + id + } + + def get(id: String): Option[EngineInstance] = DB localTx { implicit session => + sql""" + SELECT + id, + status, + startTime, + endTime, + engineId, + engineVersion, + engineVariant, + engineFactory, + batch, + env, + sparkConf, + datasourceParams, + preparatorParams, + algorithmsParams, + servingParams + FROM $tableName WHERE id = $id""".map(resultToEngineInstance). + single().apply() + } + + def getAll(): Seq[EngineInstance] = DB localTx { implicit session => + sql""" + SELECT + id, + status, + startTime, + endTime, + engineId, + engineVersion, + engineVariant, + engineFactory, + batch, + env, + sparkConf, + datasourceParams, + preparatorParams, + algorithmsParams, + servingParams + FROM $tableName""".map(resultToEngineInstance).list().apply() + } + + def getLatestCompleted( + engineId: String, + engineVersion: String, + engineVariant: String): Option[EngineInstance] = + getCompleted(engineId, engineVersion, engineVariant).headOption + + def getCompleted( + engineId: String, + engineVersion: String, + engineVariant: String): Seq[EngineInstance] = DB localTx { implicit s => + sql""" + SELECT + id, + status, + startTime, + endTime, + engineId, + engineVersion, + engineVariant, + engineFactory, + batch, + env, + sparkConf, + datasourceParams, + preparatorParams, + algorithmsParams, + servingParams + FROM $tableName + WHERE + status = 'COMPLETED' AND + engineId = $engineId AND + engineVersion = $engineVersion AND + engineVariant = $engineVariant + ORDER BY startTime DESC""". + map(resultToEngineInstance).list().apply() + } + + def update(i: EngineInstance): Unit = DB localTx { implicit session => + sql""" + update $tableName set + status = ${i.status}, + startTime = ${i.startTime}, + endTime = ${i.endTime}, + engineId = ${i.engineId}, + engineVersion = ${i.engineVersion}, + engineVariant = ${i.engineVariant}, + engineFactory = ${i.engineFactory}, + batch = ${i.batch}, + env = ${JDBCUtils.mapToString(i.env)}, + sparkConf = ${JDBCUtils.mapToString(i.sparkConf)}, + datasourceParams = ${i.dataSourceParams}, + preparatorParams = ${i.preparatorParams}, + algorithmsParams = ${i.algorithmsParams}, + servingParams = ${i.servingParams} + where id = ${i.id}""".update().apply() + } + + def delete(id: String): Unit = DB localTx { implicit session => + sql"DELETE FROM $tableName WHERE id = $id".update().apply() + } + + /** Convert JDBC results to [[EngineInstance]] */ + def resultToEngineInstance(rs: WrappedResultSet): EngineInstance = { + EngineInstance( + id = rs.string("id"), + status = rs.string("status"), + startTime = rs.jodaDateTime("startTime"), + endTime = rs.jodaDateTime("endTime"), + engineId = rs.string("engineId"), + engineVersion = rs.string("engineVersion"), + engineVariant = rs.string("engineVariant"), + engineFactory = rs.string("engineFactory"), + batch = rs.string("batch"), + env = JDBCUtils.stringToMap(rs.string("env")), + sparkConf = JDBCUtils.stringToMap(rs.string("sparkConf")), + dataSourceParams = rs.string("datasourceParams"), + preparatorParams = rs.string("preparatorParams"), + algorithmsParams = rs.string("algorithmsParams"), + servingParams = rs.string("servingParams")) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineManifests.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineManifests.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineManifests.scala new file mode 100644 index 0000000..b766689 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineManifests.scala @@ -0,0 +1,111 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.jdbc + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.EngineManifest +import org.apache.predictionio.data.storage.EngineManifests +import org.apache.predictionio.data.storage.StorageClientConfig +import scalikejdbc._ + +/** JDBC implementation of [[EngineManifests]] */ +class JDBCEngineManifests(client: String, config: StorageClientConfig, prefix: String) + extends EngineManifests with Logging { + /** Database table name for this data access object */ + val tableName = JDBCUtils.prefixTableName(prefix, "enginemanifests") + DB autoCommit { implicit session => + sql""" + create table if not exists $tableName ( + id varchar(100) not null primary key, + version text not null, + engineName text not null, + description text, + files text not null, + engineFactory text not null)""".execute().apply() + } + + def insert(m: EngineManifest): Unit = DB localTx { implicit session => + sql""" + INSERT INTO $tableName VALUES( + ${m.id}, + ${m.version}, + ${m.name}, + ${m.description}, + ${m.files.mkString(",")}, + ${m.engineFactory})""".update().apply() + } + + def get(id: String, version: String): Option[EngineManifest] = DB localTx { implicit session => + sql""" + SELECT + id, + version, + engineName, + description, + files, + engineFactory + FROM $tableName WHERE id = $id AND version = $version""". + map(resultToEngineManifest).single().apply() + } + + def getAll(): Seq[EngineManifest] = DB localTx { implicit session => + sql""" + SELECT + id, + version, + engineName, + description, + files, + engineFactory + FROM $tableName""".map(resultToEngineManifest).list().apply() + } + + def update(m: EngineManifest, upsert: Boolean = false): Unit = { + var r = 0 + DB localTx { implicit session => + r = sql""" + update $tableName set + engineName = ${m.name}, + description = ${m.description}, + files = ${m.files.mkString(",")}, + engineFactory = ${m.engineFactory} + where id = ${m.id} and version = ${m.version}""".update().apply() + } + if (r == 0) { + if (upsert) { + insert(m) + } else { + error("Cannot find a record to update, and upsert is not enabled.") + } + } + } + + def delete(id: String, version: String): Unit = DB localTx { implicit session => + sql"DELETE FROM $tableName WHERE id = $id AND version = $version". + update().apply() + } + + /** Convert JDBC results to [[EngineManifest]] */ + def resultToEngineManifest(rs: WrappedResultSet): EngineManifest = { + EngineManifest( + id = rs.string("id"), + version = rs.string("version"), + name = rs.string("engineName"), + description = rs.stringOpt("description"), + files = rs.string("files").split(","), + engineFactory = rs.string("engineFactory")) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala new file mode 100644 index 0000000..1811271 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala @@ -0,0 +1,162 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.jdbc + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.EvaluationInstance +import org.apache.predictionio.data.storage.EvaluationInstances +import org.apache.predictionio.data.storage.StorageClientConfig +import scalikejdbc._ + +/** JDBC implementations of [[EvaluationInstances]] */ +class JDBCEvaluationInstances(client: String, config: StorageClientConfig, prefix: String) + extends EvaluationInstances with Logging { + /** Database table name for this data access object */ + val tableName = JDBCUtils.prefixTableName(prefix, "evaluationinstances") + DB autoCommit { implicit session => + sql""" + create table if not exists $tableName ( + id varchar(100) not null primary key, + status text not null, + startTime timestamp DEFAULT CURRENT_TIMESTAMP, + endTime timestamp DEFAULT CURRENT_TIMESTAMP, + evaluationClass text not null, + engineParamsGeneratorClass text not null, + batch text not null, + env text not null, + sparkConf text not null, + evaluatorResults text not null, + evaluatorResultsHTML text not null, + evaluatorResultsJSON text)""".execute().apply() + } + + def insert(i: EvaluationInstance): String = DB localTx { implicit session => + val id = java.util.UUID.randomUUID().toString + sql""" + INSERT INTO $tableName VALUES( + $id, + ${i.status}, + ${i.startTime}, + ${i.endTime}, + ${i.evaluationClass}, + ${i.engineParamsGeneratorClass}, + ${i.batch}, + ${JDBCUtils.mapToString(i.env)}, + ${JDBCUtils.mapToString(i.sparkConf)}, + ${i.evaluatorResults}, + ${i.evaluatorResultsHTML}, + ${i.evaluatorResultsJSON})""".update().apply() + id + } + + def get(id: String): Option[EvaluationInstance] = DB localTx { implicit session => + sql""" + SELECT + id, + status, + startTime, + endTime, + evaluationClass, + engineParamsGeneratorClass, + batch, + env, + sparkConf, + evaluatorResults, + evaluatorResultsHTML, + evaluatorResultsJSON + FROM $tableName WHERE id = $id + """.map(resultToEvaluationInstance).single().apply() + } + + def getAll(): Seq[EvaluationInstance] = DB localTx { implicit session => + sql""" + SELECT + id, + status, + startTime, + endTime, + evaluationClass, + engineParamsGeneratorClass, + batch, + env, + sparkConf, + evaluatorResults, + evaluatorResultsHTML, + evaluatorResultsJSON + FROM $tableName + """.map(resultToEvaluationInstance).list().apply() + } + + def getCompleted(): Seq[EvaluationInstance] = DB localTx { implicit s => + sql""" + SELECT + id, + status, + startTime, + endTime, + evaluationClass, + engineParamsGeneratorClass, + batch, + env, + sparkConf, + evaluatorResults, + evaluatorResultsHTML, + evaluatorResultsJSON + FROM $tableName + WHERE + status = 'EVALCOMPLETED' + ORDER BY starttime DESC + """.map(resultToEvaluationInstance).list().apply() + } + + def update(i: EvaluationInstance): Unit = DB localTx { implicit session => + sql""" + update $tableName set + status = ${i.status}, + startTime = ${i.startTime}, + endTime = ${i.endTime}, + evaluationClass = ${i.evaluationClass}, + engineParamsGeneratorClass = ${i.engineParamsGeneratorClass}, + batch = ${i.batch}, + env = ${JDBCUtils.mapToString(i.env)}, + sparkConf = ${JDBCUtils.mapToString(i.sparkConf)}, + evaluatorResults = ${i.evaluatorResults}, + evaluatorResultsHTML = ${i.evaluatorResultsHTML}, + evaluatorResultsJSON = ${i.evaluatorResultsJSON} + where id = ${i.id}""".update().apply() + } + + def delete(id: String): Unit = DB localTx { implicit session => + sql"DELETE FROM $tableName WHERE id = $id".update().apply() + } + + /** Convert JDBC results to [[EvaluationInstance]] */ + def resultToEvaluationInstance(rs: WrappedResultSet): EvaluationInstance = { + EvaluationInstance( + id = rs.string("id"), + status = rs.string("status"), + startTime = rs.jodaDateTime("startTime"), + endTime = rs.jodaDateTime("endTime"), + evaluationClass = rs.string("evaluationClass"), + engineParamsGeneratorClass = rs.string("engineParamsGeneratorClass"), + batch = rs.string("batch"), + env = JDBCUtils.stringToMap(rs.string("env")), + sparkConf = JDBCUtils.stringToMap(rs.string("sparkConf")), + evaluatorResults = rs.string("evaluatorResults"), + evaluatorResultsHTML = rs.string("evaluatorResultsHTML"), + evaluatorResultsJSON = rs.string("evaluatorResultsJSON")) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala new file mode 100644 index 0000000..945879c --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala @@ -0,0 +1,241 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.jdbc + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.DataMap +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.LEvents +import org.apache.predictionio.data.storage.StorageClientConfig +import org.joda.time.DateTime +import org.joda.time.DateTimeZone +import org.json4s.JObject +import org.json4s.native.Serialization.read +import org.json4s.native.Serialization.write +import scalikejdbc._ + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future + +/** JDBC implementation of [[LEvents]] */ +class JDBCLEvents( + client: String, + config: StorageClientConfig, + namespace: String) extends LEvents with Logging { + implicit private val formats = org.json4s.DefaultFormats + + def init(appId: Int, channelId: Option[Int] = None): Boolean = { + + // To use index, it must be varchar less than 255 characters on a VARCHAR column + val useIndex = config.properties.contains("INDEX") && + config.properties("INDEX").equalsIgnoreCase("enabled") + + val tableName = JDBCUtils.eventTableName(namespace, appId, channelId) + val entityIdIndexName = s"idx_${tableName}_ei" + val entityTypeIndexName = s"idx_${tableName}_et" + DB autoCommit { implicit session => + if (useIndex) { + SQL(s""" + create table if not exists $tableName ( + id varchar(32) not null primary key, + event varchar(255) not null, + entityType varchar(255) not null, + entityId varchar(255) not null, + targetEntityType text, + targetEntityId text, + properties text, + eventTime timestamp DEFAULT CURRENT_TIMESTAMP, + eventTimeZone varchar(50) not null, + tags text, + prId text, + creationTime timestamp DEFAULT CURRENT_TIMESTAMP, + creationTimeZone varchar(50) not null)""").execute().apply() + + // create index + SQL(s"create index $entityIdIndexName on $tableName (entityId)").execute().apply() + SQL(s"create index $entityTypeIndexName on $tableName (entityType)").execute().apply() + } else { + SQL(s""" + create table if not exists $tableName ( + id varchar(32) not null primary key, + event text not null, + entityType text not null, + entityId text not null, + targetEntityType text, + targetEntityId text, + properties text, + eventTime timestamp DEFAULT CURRENT_TIMESTAMP, + eventTimeZone varchar(50) not null, + tags text, + prId text, + creationTime timestamp DEFAULT CURRENT_TIMESTAMP, + creationTimeZone varchar(50) not null)""").execute().apply() + } + true + } + } + + def remove(appId: Int, channelId: Option[Int] = None): Boolean = + DB autoCommit { implicit session => + SQL(s""" + drop table ${JDBCUtils.eventTableName(namespace, appId, channelId)} + """).execute().apply() + true + } + + def close(): Unit = ConnectionPool.closeAll() + + def futureInsert(event: Event, appId: Int, channelId: Option[Int])( + implicit ec: ExecutionContext): Future[String] = Future { + DB localTx { implicit session => + val id = event.eventId.getOrElse(JDBCUtils.generateId) + val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) + sql""" + insert into $tableName values( + $id, + ${event.event}, + ${event.entityType}, + ${event.entityId}, + ${event.targetEntityType}, + ${event.targetEntityId}, + ${write(event.properties.toJObject)}, + ${event.eventTime}, + ${event.eventTime.getZone.getID}, + ${if (event.tags.nonEmpty) Some(event.tags.mkString(",")) else None}, + ${event.prId}, + ${event.creationTime}, + ${event.creationTime.getZone.getID} + ) + """.update().apply() + id + } + } + + def futureGet(eventId: String, appId: Int, channelId: Option[Int])( + implicit ec: ExecutionContext): Future[Option[Event]] = Future { + DB readOnly { implicit session => + val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) + sql""" + select + id, + event, + entityType, + entityId, + targetEntityType, + targetEntityId, + properties, + eventTime, + eventTimeZone, + tags, + prId, + creationTime, + creationTimeZone + from $tableName + where id = $eventId + """.map(resultToEvent).single().apply() + } + } + + def futureDelete(eventId: String, appId: Int, channelId: Option[Int])( + implicit ec: ExecutionContext): Future[Boolean] = Future { + DB localTx { implicit session => + val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) + sql""" + delete from $tableName where id = $eventId + """.update().apply() + true + } + } + + def futureFind( + appId: Int, + channelId: Option[Int] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + entityType: Option[String] = None, + entityId: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None, + limit: Option[Int] = None, + reversed: Option[Boolean] = None + )(implicit ec: ExecutionContext): Future[Iterator[Event]] = Future { + DB readOnly { implicit session => + val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) + val whereClause = sqls.toAndConditionOpt( + startTime.map(x => sqls"eventTime >= $x"), + untilTime.map(x => sqls"eventTime < $x"), + entityType.map(x => sqls"entityType = $x"), + entityId.map(x => sqls"entityId = $x"), + eventNames.map(x => + sqls.toOrConditionOpt(x.map(y => + Some(sqls"event = $y") + ): _*) + ).getOrElse(None), + targetEntityType.map(x => x.map(y => sqls"targetEntityType = $y") + .getOrElse(sqls"targetEntityType IS NULL")), + targetEntityId.map(x => x.map(y => sqls"targetEntityId = $y") + .getOrElse(sqls"targetEntityId IS NULL")) + ).map(sqls.where(_)).getOrElse(sqls"") + val orderByClause = reversed.map(x => + if (x) sqls"eventTime desc" else sqls"eventTime asc" + ).getOrElse(sqls"eventTime asc") + val limitClause = limit.map(x => + if (x < 0) sqls"" else sqls.limit(x) + ).getOrElse(sqls"") + val q = sql""" + select + id, + event, + entityType, + entityId, + targetEntityType, + targetEntityId, + properties, + eventTime, + eventTimeZone, + tags, + prId, + creationTime, + creationTimeZone + from $tableName + $whereClause + order by $orderByClause + $limitClause + """ + q.map(resultToEvent).list().apply().toIterator + } + } + + private[prediction] def resultToEvent(rs: WrappedResultSet): Event = { + Event( + eventId = rs.stringOpt("id"), + event = rs.string("event"), + entityType = rs.string("entityType"), + entityId = rs.string("entityId"), + targetEntityType = rs.stringOpt("targetEntityType"), + targetEntityId = rs.stringOpt("targetEntityId"), + properties = rs.stringOpt("properties").map(p => + DataMap(read[JObject](p))).getOrElse(DataMap()), + eventTime = new DateTime(rs.jodaDateTime("eventTime"), + DateTimeZone.forID(rs.string("eventTimeZone"))), + tags = rs.stringOpt("tags").map(t => t.split(",").toList).getOrElse(Nil), + prId = rs.stringOpt("prId"), + creationTime = new DateTime(rs.jodaDateTime("creationTime"), + DateTimeZone.forID(rs.string("creationTimeZone"))) + ) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala new file mode 100644 index 0000000..01ed6ca --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala @@ -0,0 +1,52 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.jdbc + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.Model +import org.apache.predictionio.data.storage.Models +import org.apache.predictionio.data.storage.StorageClientConfig +import scalikejdbc._ + +/** JDBC implementation of [[Models]] */ +class JDBCModels(client: String, config: StorageClientConfig, prefix: String) + extends Models with Logging { + /** Database table name for this data access object */ + val tableName = JDBCUtils.prefixTableName(prefix, "models") + + /** Determines binary column type based on JDBC driver type */ + val binaryColumnType = JDBCUtils.binaryColumnType(client) + DB autoCommit { implicit session => + sql""" + create table if not exists $tableName ( + id varchar(100) not null primary key, + models $binaryColumnType not null)""".execute().apply() + } + + def insert(i: Model): Unit = DB localTx { implicit session => + sql"insert into $tableName values(${i.id}, ${i.models})".update().apply() + } + + def get(id: String): Option[Model] = DB readOnly { implicit session => + sql"select id, models from $tableName where id = $id".map { r => + Model(id = r.string("id"), models = r.bytes("models")) + }.single().apply() + } + + def delete(id: String): Unit = DB localTx { implicit session => + sql"delete from $tableName where id = $id".execute().apply() + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala new file mode 100644 index 0000000..c01989c --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala @@ -0,0 +1,160 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.jdbc + +import java.sql.{DriverManager, ResultSet} + +import com.github.nscala_time.time.Imports._ +import org.apache.predictionio.data.storage.{DataMap, Event, PEvents, StorageClientConfig} +import org.apache.spark.SparkContext +import org.apache.spark.rdd.{JdbcRDD, RDD} +import org.apache.spark.sql.{SQLContext, SaveMode} +import org.json4s.JObject +import org.json4s.native.Serialization + +/** JDBC implementation of [[PEvents]] */ +class JDBCPEvents(client: String, config: StorageClientConfig, namespace: String) extends PEvents { + @transient private implicit lazy val formats = org.json4s.DefaultFormats + def find( + appId: Int, + channelId: Option[Int] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + entityType: Option[String] = None, + entityId: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None)(sc: SparkContext): RDD[Event] = { + val lower = startTime.map(_.getMillis).getOrElse(0.toLong) + /** Change the default upper bound from +100 to +1 year because MySQL's + * FROM_UNIXTIME(t) will return NULL if we use +100 years. + */ + val upper = untilTime.map(_.getMillis).getOrElse((DateTime.now + 1.years).getMillis) + val par = scala.math.min( + new Duration(upper - lower).getStandardDays, + config.properties.getOrElse("PARTITIONS", "4").toLong).toInt + val entityTypeClause = entityType.map(x => s"and entityType = '$x'").getOrElse("") + val entityIdClause = entityId.map(x => s"and entityId = '$x'").getOrElse("") + val eventNamesClause = + eventNames.map("and (" + _.map(y => s"event = '$y'").mkString(" or ") + ")").getOrElse("") + val targetEntityTypeClause = targetEntityType.map( + _.map(x => s"and targetEntityType = '$x'" + ).getOrElse("and targetEntityType is null")).getOrElse("") + val targetEntityIdClause = targetEntityId.map( + _.map(x => s"and targetEntityId = '$x'" + ).getOrElse("and targetEntityId is null")).getOrElse("") + val q = s""" + select + id, + event, + entityType, + entityId, + targetEntityType, + targetEntityId, + properties, + eventTime, + eventTimeZone, + tags, + prId, + creationTime, + creationTimeZone + from ${JDBCUtils.eventTableName(namespace, appId, channelId)} + where + eventTime >= ${JDBCUtils.timestampFunction(client)}(?) and + eventTime < ${JDBCUtils.timestampFunction(client)}(?) + $entityTypeClause + $entityIdClause + $eventNamesClause + $targetEntityTypeClause + $targetEntityIdClause + """.replace("\n", " ") + new JdbcRDD( + sc, + () => { + DriverManager.getConnection( + client, + config.properties("USERNAME"), + config.properties("PASSWORD")) + }, + q, + lower / 1000, + upper / 1000, + par, + (r: ResultSet) => { + Event( + eventId = Option(r.getString("id")), + event = r.getString("event"), + entityType = r.getString("entityType"), + entityId = r.getString("entityId"), + targetEntityType = Option(r.getString("targetEntityType")), + targetEntityId = Option(r.getString("targetEntityId")), + properties = Option(r.getString("properties")).map(x => + DataMap(Serialization.read[JObject](x))).getOrElse(DataMap()), + eventTime = new DateTime(r.getTimestamp("eventTime").getTime, + DateTimeZone.forID(r.getString("eventTimeZone"))), + tags = Option(r.getString("tags")).map(x => + x.split(",").toList).getOrElse(Nil), + prId = Option(r.getString("prId")), + creationTime = new DateTime(r.getTimestamp("creationTime").getTime, + DateTimeZone.forID(r.getString("creationTimeZone")))) + }).cache() + } + + def write(events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { + val sqlContext = new SQLContext(sc) + + import sqlContext.implicits._ + + val tableName = JDBCUtils.eventTableName(namespace, appId, channelId) + + val eventTableColumns = Seq[String]( + "id" + , "event" + , "entityType" + , "entityId" + , "targetEntityType" + , "targetEntityId" + , "properties" + , "eventTime" + , "eventTimeZone" + , "tags" + , "prId" + , "creationTime" + , "creationTimeZone") + + val eventDF = events.map { event => + (event.eventId.getOrElse(JDBCUtils.generateId) + , event.event + , event.entityType + , event.entityId + , event.targetEntityType.orNull + , event.targetEntityId.orNull + , if (!event.properties.isEmpty) Serialization.write(event.properties.toJObject) else null + , new java.sql.Timestamp(event.eventTime.getMillis) + , event.eventTime.getZone.getID + , if (event.tags.nonEmpty) Some(event.tags.mkString(",")) else null + , event.prId + , new java.sql.Timestamp(event.creationTime.getMillis) + , event.creationTime.getZone.getID) + }.toDF(eventTableColumns:_*) + + // spark version 1.4.0 or higher + val prop = new java.util.Properties + prop.setProperty("user", config.properties("USERNAME")) + prop.setProperty("password", config.properties("PASSWORD")) + eventDF.write.mode(SaveMode.Append).jdbc(client, tableName, prop) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala new file mode 100644 index 0000000..e95b49b --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala @@ -0,0 +1,103 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.jdbc + +import scalikejdbc._ + +/** JDBC related utilities */ +object JDBCUtils { + /** Extract JDBC driver type from URL + * + * @param url JDBC URL + * @return The driver type, e.g. postgresql + */ + def driverType(url: String): String = { + val capture = """jdbc:([^:]+):""".r + capture findFirstIn url match { + case Some(capture(driverType)) => driverType + case None => "" + } + } + + /** Determines binary column type from JDBC URL + * + * @param url JDBC URL + * @return Binary column type as SQLSyntax, e.g. LONGBLOB + */ + def binaryColumnType(url: String): SQLSyntax = { + driverType(url) match { + case "postgresql" => sqls"bytea" + case "mysql" => sqls"longblob" + case _ => sqls"longblob" + } + } + + /** Determines UNIX timestamp conversion function from JDBC URL + * + * @param url JDBC URL + * @return Timestamp conversion function, e.g. TO_TIMESTAMP + */ + def timestampFunction(url: String): String = { + driverType(url) match { + case "postgresql" => "to_timestamp" + case "mysql" => "from_unixtime" + case _ => "from_unixtime" + } + } + + /** Converts Map of String to String to comma-separated list of key=value + * + * @param m Map of String to String + * @return Comma-separated list, e.g. FOO=BAR,X=Y,... + */ + def mapToString(m: Map[String, String]): String = { + m.map(t => s"${t._1}=${t._2}").mkString(",") + } + + /** Inverse of mapToString + * + * @param str Comma-separated list, e.g. FOO=BAR,X=Y,... + * @return Map of String to String, e.g. Map("FOO" -> "BAR", "X" -> "Y", ...) + */ + def stringToMap(str: String): Map[String, String] = { + str.split(",").map { x => + val y = x.split("=") + y(0) -> y(1) + }.toMap[String, String] + } + + /** Generate 32-character random ID using UUID with - stripped */ + def generateId: String = java.util.UUID.randomUUID().toString.replace("-", "") + + /** Prefix a table name + * + * @param prefix Table prefix + * @param table Table name + * @return Prefixed table name + */ + def prefixTableName(prefix: String, table: String): SQLSyntax = + sqls.createUnsafely(s"${prefix}_$table") + + /** Derive event table name + * + * @param namespace Namespace of event tables + * @param appId App ID + * @param channelId Optional channel ID + * @return Full event table name + */ + def eventTableName(namespace: String, appId: Int, channelId: Option[Int]): String = + s"${namespace}_${appId}${channelId.map("_" + _).getOrElse("")}" +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala new file mode 100644 index 0000000..6015870 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala @@ -0,0 +1,50 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.jdbc + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.BaseStorageClient +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.predictionio.data.storage.StorageClientException +import scalikejdbc._ + +/** JDBC implementation of [[BaseStorageClient]] */ +class StorageClient(val config: StorageClientConfig) + extends BaseStorageClient with Logging { + override val prefix = "JDBC" + + if (!config.properties.contains("URL")) { + throw new StorageClientException("The URL variable is not set!", null) + } + if (!config.properties.contains("USERNAME")) { + throw new StorageClientException("The USERNAME variable is not set!", null) + } + if (!config.properties.contains("PASSWORD")) { + throw new StorageClientException("The PASSWORD variable is not set!", null) + } + + // set max size of connection pool + val maxSize: Int = config.properties.getOrElse("CONNECTIONS", "8").toInt + val settings = ConnectionPoolSettings(maxSize = maxSize) + + ConnectionPool.singleton( + config.properties("URL"), + config.properties("USERNAME"), + config.properties("PASSWORD"), + settings) + /** JDBC connection URL. Connections are managed by ScalikeJDBC. */ + val client = config.properties("URL") +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala new file mode 100644 index 0000000..c423b29 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala @@ -0,0 +1,23 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +/** JDBC implementation of storage traits, supporting meta data, event data, and + * model data + * + * @group Implementation + */ +package object jdbc {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala new file mode 100644 index 0000000..82989aa --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala @@ -0,0 +1,59 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.localfs + +import java.io.File +import java.io.FileNotFoundException +import java.io.FileOutputStream + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.Model +import org.apache.predictionio.data.storage.Models +import org.apache.predictionio.data.storage.StorageClientConfig + +import scala.io.Source + +class LocalFSModels(f: File, config: StorageClientConfig, prefix: String) + extends Models with Logging { + + def insert(i: Model): Unit = { + try { + val fos = new FileOutputStream(new File(f, s"${prefix}${i.id}")) + fos.write(i.models) + fos.close + } catch { + case e: FileNotFoundException => error(e.getMessage) + } + } + + def get(id: String): Option[Model] = { + try { + Some(Model( + id = id, + models = Source.fromFile(new File(f, s"${prefix}${id}"))( + scala.io.Codec.ISO8859).map(_.toByte).toArray)) + } catch { + case e: Throwable => + error(e.getMessage) + None + } + } + + def delete(id: String): Unit = { + val m = new File(f, s"${prefix}${id}") + if (!m.delete) error(s"Unable to delete ${m.getCanonicalPath}!") + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/localfs/StorageClient.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/localfs/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/localfs/StorageClient.scala new file mode 100644 index 0000000..8206384 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/localfs/StorageClient.scala @@ -0,0 +1,43 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.localfs + +import java.io.File + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.BaseStorageClient +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.predictionio.data.storage.StorageClientException + +class StorageClient(val config: StorageClientConfig) extends BaseStorageClient + with Logging { + override val prefix = "LocalFS" + val f = new File( + config.properties.getOrElse("PATH", config.properties("HOSTS"))) + if (f.exists) { + if (!f.isDirectory) throw new StorageClientException( + s"${f} already exists but it is not a directory!", + null) + if (!f.canWrite) throw new StorageClientException( + s"${f} already exists but it is not writable!", + null) + } else { + if (!f.mkdirs) throw new StorageClientException( + s"${f} does not exist and automatic creation failed!", + null) + } + val client = f +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/localfs/package.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/localfs/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/localfs/package.scala new file mode 100644 index 0000000..f245a06 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/localfs/package.scala @@ -0,0 +1,22 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +/** Local file system implementation of storage traits, supporting model data only + * + * @group Implementation + */ +package object localfs {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/storage/package.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/package.scala new file mode 100644 index 0000000..09e6fa3 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/package.scala @@ -0,0 +1,26 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data + +/** If you are an engine developer, please refer to the [[store]] package. + * + * This package provides convenient access to underlying data access objects. + * The common entry point is [[Storage]]. + * + * Developer APIs are available to advanced developers to add support of other + * data store backends. + */ +package object storage {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/store/Common.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/store/Common.scala b/data/src/main/scala/org/apache/predictionio/data/store/Common.scala new file mode 100644 index 0000000..81b4b28 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/store/Common.scala @@ -0,0 +1,50 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.store + +import org.apache.predictionio.data.storage.Storage +import grizzled.slf4j.Logger + +private[prediction] object Common { + + @transient lazy val logger = Logger[this.type] + @transient lazy private val appsDb = Storage.getMetaDataApps() + @transient lazy private val channelsDb = Storage.getMetaDataChannels() + + /* throw exception if invalid app name or channel name */ + def appNameToId(appName: String, channelName: Option[String]): (Int, Option[Int]) = { + val appOpt = appsDb.getByName(appName) + + appOpt.map { app => + val channelMap: Map[String, Int] = channelsDb.getByAppid(app.id) + .map(c => (c.name, c.id)).toMap + + val channelId: Option[Int] = channelName.map { ch => + if (channelMap.contains(ch)) { + channelMap(ch) + } else { + logger.error(s"Invalid channel name ${ch}.") + throw new IllegalArgumentException(s"Invalid channel name ${ch}.") + } + } + + (app.id, channelId) + }.getOrElse { + logger.error(s"Invalid app name ${appName}") + throw new IllegalArgumentException(s"Invalid app name ${appName}") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala new file mode 100644 index 0000000..ae38e7b --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala @@ -0,0 +1,142 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.store + +import org.apache.predictionio.data.storage.Storage +import org.apache.predictionio.data.storage.Event + +import org.joda.time.DateTime + +import scala.concurrent.Await +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.Duration + +/** This object provides a set of operation to access Event Store + * without going through Spark's parallelization + */ +object LEventStore { + + private val defaultTimeout = Duration(60, "seconds") + + @transient lazy private val eventsDb = Storage.getLEvents() + + /** Reads events of the specified entity. May use this in Algorithm's predict() + * or Serving logic to have fast event store access. + * + * @param appName return events of this app + * @param entityType return events of this entityType + * @param entityId return events of this entityId + * @param channelName return events of this channel (default channel if it's None) + * @param eventNames return events with any of these event names. + * @param targetEntityType return events of this targetEntityType: + * - None means no restriction on targetEntityType + * - Some(None) means no targetEntityType for this event + * - Some(Some(x)) means targetEntityType should match x. + * @param targetEntityId return events of this targetEntityId + * - None means no restriction on targetEntityId + * - Some(None) means no targetEntityId for this event + * - Some(Some(x)) means targetEntityId should match x. + * @param startTime return events with eventTime >= startTime + * @param untilTime return events with eventTime < untilTime + * @param limit Limit number of events. Get all events if None or Some(-1) + * @param latest Return latest event first (default true) + * @return Iterator[Event] + */ + def findByEntity( + appName: String, + entityType: String, + entityId: String, + channelName: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + limit: Option[Int] = None, + latest: Boolean = true, + timeout: Duration = defaultTimeout): Iterator[Event] = { + + val (appId, channelId) = Common.appNameToId(appName, channelName) + + Await.result(eventsDb.futureFind( + appId = appId, + channelId = channelId, + startTime = startTime, + untilTime = untilTime, + entityType = Some(entityType), + entityId = Some(entityId), + eventNames = eventNames, + targetEntityType = targetEntityType, + targetEntityId = targetEntityId, + limit = limit, + reversed = Some(latest)), + timeout) + } + + /** Reads events generically. If entityType or entityId is not specified, it + * results in table scan. + * + * @param appName return events of this app + * @param entityType return events of this entityType + * - None means no restriction on entityType + * - Some(x) means entityType should match x. + * @param entityId return events of this entityId + * - None means no restriction on entityId + * - Some(x) means entityId should match x. + * @param channelName return events of this channel (default channel if it's None) + * @param eventNames return events with any of these event names. + * @param targetEntityType return events of this targetEntityType: + * - None means no restriction on targetEntityType + * - Some(None) means no targetEntityType for this event + * - Some(Some(x)) means targetEntityType should match x. + * @param targetEntityId return events of this targetEntityId + * - None means no restriction on targetEntityId + * - Some(None) means no targetEntityId for this event + * - Some(Some(x)) means targetEntityId should match x. + * @param startTime return events with eventTime >= startTime + * @param untilTime return events with eventTime < untilTime + * @param limit Limit number of events. Get all events if None or Some(-1) + * @return Iterator[Event] + */ + def find( + appName: String, + entityType: Option[String] = None, + entityId: Option[String] = None, + channelName: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + limit: Option[Int] = None, + timeout: Duration = defaultTimeout): Iterator[Event] = { + + val (appId, channelId) = Common.appNameToId(appName, channelName) + + Await.result(eventsDb.futureFind( + appId = appId, + channelId = channelId, + startTime = startTime, + untilTime = untilTime, + entityType = entityType, + entityId = entityId, + eventNames = eventNames, + targetEntityType = targetEntityType, + targetEntityId = targetEntityId, + limit = limit), timeout) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/store/PEventStore.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/store/PEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/PEventStore.scala new file mode 100644 index 0000000..b8f0037 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/store/PEventStore.scala @@ -0,0 +1,116 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.store + +import org.apache.predictionio.data.storage.Storage +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.PropertyMap + +import org.joda.time.DateTime + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD + +/** This object provides a set of operation to access Event Store + * with Spark's parallelization + */ +object PEventStore { + + @transient lazy private val eventsDb = Storage.getPEvents() + + /** Read events from Event Store + * + * @param appName return events of this app + * @param channelName return events of this channel (default channel if it's None) + * @param startTime return events with eventTime >= startTime + * @param untilTime return events with eventTime < untilTime + * @param entityType return events of this entityType + * @param entityId return events of this entityId + * @param eventNames return events with any of these event names. + * @param targetEntityType return events of this targetEntityType: + * - None means no restriction on targetEntityType + * - Some(None) means no targetEntityType for this event + * - Some(Some(x)) means targetEntityType should match x. + * @param targetEntityId return events of this targetEntityId + * - None means no restriction on targetEntityId + * - Some(None) means no targetEntityId for this event + * - Some(Some(x)) means targetEntityId should match x. + * @param sc Spark context + * @return RDD[Event] + */ + def find( + appName: String, + channelName: Option[String] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + entityType: Option[String] = None, + entityId: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None + )(sc: SparkContext): RDD[Event] = { + + val (appId, channelId) = Common.appNameToId(appName, channelName) + + eventsDb.find( + appId = appId, + channelId = channelId, + startTime = startTime, + untilTime = untilTime, + entityType = entityType, + entityId = entityId, + eventNames = eventNames, + targetEntityType = targetEntityType, + targetEntityId = targetEntityId + )(sc) + + } + + /** Aggregate properties of entities based on these special events: + * \$set, \$unset, \$delete events. + * + * @param appName use events of this app + * @param entityType aggregate properties of the entities of this entityType + * @param channelName use events of this channel (default channel if it's None) + * @param startTime use events with eventTime >= startTime + * @param untilTime use events with eventTime < untilTime + * @param required only keep entities with these required properties defined + * @param sc Spark context + * @return RDD[(String, PropertyMap)] RDD of entityId and PropetyMap pair + */ + def aggregateProperties( + appName: String, + entityType: String, + channelName: Option[String] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + required: Option[Seq[String]] = None) + (sc: SparkContext): RDD[(String, PropertyMap)] = { + + val (appId, channelId) = Common.appNameToId(appName, channelName) + + eventsDb.aggregateProperties( + appId = appId, + entityType = entityType, + channelId = channelId, + startTime = startTime, + untilTime = untilTime, + required = required + )(sc) + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala new file mode 100644 index 0000000..fa14daf --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala @@ -0,0 +1,142 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.store.java + +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.store.LEventStore +import org.joda.time.DateTime + +import scala.collection.JavaConversions +import scala.concurrent.duration.Duration + +/** This Java-friendly object provides a set of operation to access Event Store + * without going through Spark's parallelization + */ +object LJavaEventStore { + + /** Reads events of the specified entity. May use this in Algorithm's predict() + * or Serving logic to have fast event store access. + * + * @param appName return events of this app + * @param entityType return events of this entityType + * @param entityId return events of this entityId + * @param channelName return events of this channel (default channel if it's None) + * @param eventNames return events with any of these event names. + * @param targetEntityType return events of this targetEntityType: + * - None means no restriction on targetEntityType + * - Some(None) means no targetEntityType for this event + * - Some(Some(x)) means targetEntityType should match x. + * @param targetEntityId return events of this targetEntityId + * - None means no restriction on targetEntityId + * - Some(None) means no targetEntityId for this event + * - Some(Some(x)) means targetEntityId should match x. + * @param startTime return events with eventTime >= startTime + * @param untilTime return events with eventTime < untilTime + * @param limit Limit number of events. Get all events if None or Some(-1) + * @param latest Return latest event first + * @return java.util.List[Event] + */ + def findByEntity( + appName: String, + entityType: String, + entityId: String, + channelName: Option[String], + eventNames: Option[java.util.List[String]], + targetEntityType: Option[Option[String]], + targetEntityId: Option[Option[String]], + startTime: Option[DateTime], + untilTime: Option[DateTime], + limit: Option[Integer], + latest: Boolean, + timeout: Duration): java.util.List[Event] = { + + val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq) + val limitInt = limit.map(_.intValue()) + + JavaConversions.seqAsJavaList( + LEventStore.findByEntity( + appName, + entityType, + entityId, + channelName, + eventNamesSeq, + targetEntityType, + targetEntityId, + startTime, + untilTime, + limitInt, + latest, + timeout + ).toSeq) + } + + /** Reads events generically. If entityType or entityId is not specified, it + * results in table scan. + * + * @param appName return events of this app + * @param entityType return events of this entityType + * - None means no restriction on entityType + * - Some(x) means entityType should match x. + * @param entityId return events of this entityId + * - None means no restriction on entityId + * - Some(x) means entityId should match x. + * @param channelName return events of this channel (default channel if it's None) + * @param eventNames return events with any of these event names. + * @param targetEntityType return events of this targetEntityType: + * - None means no restriction on targetEntityType + * - Some(None) means no targetEntityType for this event + * - Some(Some(x)) means targetEntityType should match x. + * @param targetEntityId return events of this targetEntityId + * - None means no restriction on targetEntityId + * - Some(None) means no targetEntityId for this event + * - Some(Some(x)) means targetEntityId should match x. + * @param startTime return events with eventTime >= startTime + * @param untilTime return events with eventTime < untilTime + * @param limit Limit number of events. Get all events if None or Some(-1) + * @return java.util.List[Event] + */ + def find( + appName: String, + entityType: Option[String], + entityId: Option[String], + channelName: Option[String], + eventNames: Option[java.util.List[String]], + targetEntityType: Option[Option[String]], + targetEntityId: Option[Option[String]], + startTime: Option[DateTime], + untilTime: Option[DateTime], + limit: Option[Integer], + timeout: Duration): java.util.List[Event] = { + + val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq) + val limitInt = limit.map(_.intValue()) + + JavaConversions.seqAsJavaList( + LEventStore.find( + appName, + entityType, + entityId, + channelName, + eventNamesSeq, + targetEntityType, + targetEntityId, + startTime, + untilTime, + limitInt, + timeout + ).toSeq) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/store/java/OptionHelper.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/store/java/OptionHelper.scala b/data/src/main/scala/org/apache/predictionio/data/store/java/OptionHelper.scala new file mode 100644 index 0000000..b6d174b --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/store/java/OptionHelper.scala @@ -0,0 +1,29 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.store.java + +/** Used by Java-based engines to mock Some and None */ +object OptionHelper { + /** Mimics a None from Java-based engine */ + def none[T]: Option[T] = { + Option(null.asInstanceOf[T]) + } + + /** Mimics a Some from Java-based engine */ + def some[T](value: T): Option[T] = { + Some(value) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/store/java/PJavaEventStore.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/store/java/PJavaEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/java/PJavaEventStore.scala new file mode 100644 index 0000000..c47032c --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/store/java/PJavaEventStore.scala @@ -0,0 +1,109 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.store.java + +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.PropertyMap +import org.apache.predictionio.data.store.PEventStore +import org.apache.spark.SparkContext +import org.apache.spark.api.java.JavaRDD +import org.joda.time.DateTime + +import scala.collection.JavaConversions + +/** This Java-friendly object provides a set of operation to access Event Store + * with Spark's parallelization + */ +object PJavaEventStore { + + /** Read events from Event Store + * + * @param appName return events of this app + * @param channelName return events of this channel (default channel if it's None) + * @param startTime return events with eventTime >= startTime + * @param untilTime return events with eventTime < untilTime + * @param entityType return events of this entityType + * @param entityId return events of this entityId + * @param eventNames return events with any of these event names. + * @param targetEntityType return events of this targetEntityType: + * - None means no restriction on targetEntityType + * - Some(None) means no targetEntityType for this event + * - Some(Some(x)) means targetEntityType should match x. + * @param targetEntityId return events of this targetEntityId + * - None means no restriction on targetEntityId + * - Some(None) means no targetEntityId for this event + * - Some(Some(x)) means targetEntityId should match x. + * @param sc Spark context + * @return JavaRDD[Event] + */ + def find( + appName: String, + channelName: Option[String], + startTime: Option[DateTime], + untilTime: Option[DateTime], + entityType: Option[String], + entityId: Option[String], + eventNames: Option[java.util.List[String]], + targetEntityType: Option[Option[String]], + targetEntityId: Option[Option[String]], + sc: SparkContext): JavaRDD[Event] = { + + val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq) + + PEventStore.find( + appName, + channelName, + startTime, + untilTime, + entityType, + entityId, + eventNamesSeq, + targetEntityType, + targetEntityId + )(sc) + } + + /** Aggregate properties of entities based on these special events: + * \$set, \$unset, \$delete events. + * + * @param appName use events of this app + * @param entityType aggregate properties of the entities of this entityType + * @param channelName use events of this channel (default channel if it's None) + * @param startTime use events with eventTime >= startTime + * @param untilTime use events with eventTime < untilTime + * @param required only keep entities with these required properties defined + * @param sc Spark context + * @return JavaRDD[(String, PropertyMap)] JavaRDD of entityId and PropetyMap pair + */ + def aggregateProperties( + appName: String, + entityType: String, + channelName: Option[String], + startTime: Option[DateTime], + untilTime: Option[DateTime], + required: Option[java.util.List[String]], + sc: SparkContext): JavaRDD[(String, PropertyMap)] = { + + PEventStore.aggregateProperties( + appName, + entityType, + channelName, + startTime, + untilTime + )(sc) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/store/package.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/store/package.scala b/data/src/main/scala/org/apache/predictionio/data/store/package.scala new file mode 100644 index 0000000..36c592f --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/store/package.scala @@ -0,0 +1,21 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data + +/** Provides high level interfaces to the Event Store from within a prediction + * engine. + */ +package object store {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala b/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala new file mode 100644 index 0000000..31937d5 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/view/DataView.scala @@ -0,0 +1,110 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.view + +import org.apache.predictionio.annotation.Experimental +import org.apache.predictionio.data.storage.Event + +import grizzled.slf4j.Logger +import org.apache.predictionio.data.store.PEventStore + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.SQLContext +import org.joda.time.DateTime + +import scala.reflect.ClassTag +import scala.reflect.runtime.universe._ +import scala.util.hashing.MurmurHash3 + +/** + * :: Experimental :: + */ +@Experimental +object DataView { + /** + * :: Experimental :: + * + * Create a DataFrame from events of a specified app. + * + * @param appName return events of this app + * @param channelName use events of this channel (default channel if it's None) + * @param startTime return events with eventTime >= startTime + * @param untilTime return events with eventTime < untilTime + * @param conversionFunction a function that turns raw Events into events of interest. + * If conversionFunction returns None, such events are dropped. + * @param name identify the DataFrame created + * @param version used to track changes to the conversionFunction, e.g. version = "20150413" + * and update whenever the function is changed. + * @param sqlContext SQL context + * @tparam E the output type of the conversion function. The type needs to extend Product + * (e.g. case class) + * @return a DataFrame of events + */ + @Experimental + def create[E <: Product: TypeTag: ClassTag]( + appName: String, + channelName: Option[String] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + conversionFunction: Event => Option[E], + name: String = "", + version: String = "")(sqlContext: SQLContext): DataFrame = { + + @transient lazy val logger = Logger[this.type] + + val sc = sqlContext.sparkContext + + val beginTime = startTime match { + case Some(t) => t + case None => new DateTime(0L) + } + val endTime = untilTime match { + case Some(t) => t + case None => DateTime.now() // fix the current time + } + // detect changes to the case class + val uid = java.io.ObjectStreamClass.lookup(implicitly[reflect.ClassTag[E]].runtimeClass) + .getSerialVersionUID + val hash = MurmurHash3.stringHash(s"$beginTime-$endTime-$version-$uid") + val baseDir = s"${sys.env("PIO_FS_BASEDIR")}/view" + val fileName = s"$baseDir/$name-$appName-$hash.parquet" + try { + sqlContext.parquetFile(fileName) + } catch { + case e: java.io.FileNotFoundException => + logger.info("Cached copy not found, reading from DB.") + // if cached copy is found, use it. If not, grab from Storage + val result: RDD[E] = PEventStore.find( + appName = appName, + channelName = channelName, + startTime = startTime, + untilTime = Some(endTime))(sc) + .flatMap((e) => conversionFunction(e)) + import sqlContext.implicits._ // needed for RDD.toDF() + val resultDF = result.toDF() + + resultDF.saveAsParquetFile(fileName) + sqlContext.parquetFile(fileName) + case e: java.lang.RuntimeException => + if (e.toString.contains("is not a Parquet file")) { + logger.error(s"$fileName does not contain a valid Parquet file. " + + "Please delete it and try again.") + } + throw e + } + } +}
