http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala ---------------------------------------------------------------------- diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala new file mode 100644 index 0000000..de74d46 --- /dev/null +++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.hbase.upgrade + +import org.apache.predictionio.annotation.Experimental + +import grizzled.slf4j.Logger +import org.apache.predictionio.data.storage.Storage +import org.apache.predictionio.data.storage.DataMap +import org.apache.predictionio.data.storage.hbase.HBLEvents +import org.apache.predictionio.data.storage.hbase.HBEventsUtil + +import scala.collection.JavaConversions._ + +import scala.concurrent._ +import ExecutionContext.Implicits.global +import org.apache.predictionio.data.storage.LEvents +import scala.concurrent.Await +import scala.concurrent.duration.Duration +import java.lang.Thread + +object CheckDistribution { + def entityType(eventClient: LEvents, appId: Int) + : Map[(String, Option[String]), Int] = { + eventClient + .find(appId = appId) + .foldLeft(Map[(String, Option[String]), Int]().withDefaultValue(0)) { + case (m, e) => { + val k = (e.entityType, e.targetEntityType) + m.updated(k, m(k) + 1) + } + } + } + + def runMain(appId: Int) { + val eventClient = Storage.getLEvents().asInstanceOf[HBLEvents] + + entityType(eventClient, appId) + .toSeq + .sortBy(-_._2) + .foreach { println } + + } + + def main(args: Array[String]) { + runMain(args(0).toInt) + } + +} + +/** :: Experimental :: */ +@Experimental +object Upgrade_0_8_3 { + val NameMap = Map( + "pio_user" -> "user", + "pio_item" -> "item") + val RevNameMap = NameMap.toSeq.map(_.swap).toMap + + val logger = Logger[this.type] + + def main(args: Array[String]) { + val fromAppId = args(0).toInt + val toAppId = args(1).toInt + + runMain(fromAppId, toAppId) + } + + def runMain(fromAppId: Int, toAppId: Int): Unit = { + upgrade(fromAppId, toAppId) + } + + + val obsEntityTypes = Set("pio_user", "pio_item") + val obsProperties = Set( + "pio_itypes", "pio_starttime", "pio_endtime", + "pio_inactive", "pio_price", "pio_rating") + + def hasPIOPrefix(eventClient: LEvents, appId: Int): Boolean = { + eventClient.find(appId = appId).filter( e => + (obsEntityTypes.contains(e.entityType) || + e.targetEntityType.map(obsEntityTypes.contains(_)).getOrElse(false) || + (!e.properties.keySet.forall(!obsProperties.contains(_))) + ) + ).hasNext + } + + def isEmpty(eventClient: LEvents, appId: Int): Boolean = + !eventClient.find(appId = appId).hasNext + + + def upgradeCopy(eventClient: LEvents, fromAppId: Int, toAppId: Int) { + val fromDist = CheckDistribution.entityType(eventClient, fromAppId) + + logger.info("FromAppId Distribution") + fromDist.toSeq.sortBy(-_._2).foreach { e => logger.info(e) } + + val events = eventClient + .find(appId = fromAppId) + .zipWithIndex + .foreach { case (fromEvent, index) => { + if (index % 50000 == 0) { + // logger.info(s"Progress: $fromEvent $index") + logger.info(s"Progress: $index") + } + + + val fromEntityType = fromEvent.entityType + val toEntityType = NameMap.getOrElse(fromEntityType, fromEntityType) + + val fromTargetEntityType = fromEvent.targetEntityType + val toTargetEntityType = fromTargetEntityType + .map { et => NameMap.getOrElse(et, et) } + + val toProperties = DataMap(fromEvent.properties.fields.map { + case (k, v) => + val newK = if (obsProperties.contains(k)) { + val nK = k.stripPrefix("pio_") + logger.info(s"property ${k} will be renamed to ${nK}") + nK + } else k + (newK, v) + }) + + val toEvent = fromEvent.copy( + entityType = toEntityType, + targetEntityType = toTargetEntityType, + properties = toProperties) + + eventClient.insert(toEvent, toAppId) + }} + + + val toDist = CheckDistribution.entityType(eventClient, toAppId) + + logger.info("Recap fromAppId Distribution") + fromDist.toSeq.sortBy(-_._2).foreach { e => logger.info(e) } + + logger.info("ToAppId Distribution") + toDist.toSeq.sortBy(-_._2).foreach { e => logger.info(e) } + + val fromGood = fromDist + .toSeq + .forall { case (k, c) => { + val (et, tet) = k + val net = NameMap.getOrElse(et, et) + val ntet = tet.map(tet => NameMap.getOrElse(tet, tet)) + val nk = (net, ntet) + val nc = toDist.getOrElse(nk, -1) + val checkMatch = (c == nc) + if (!checkMatch) { + logger.info(s"${k} doesn't match: old has ${c}. new has ${nc}.") + } + checkMatch + }} + + val toGood = toDist + .toSeq + .forall { case (k, c) => { + val (et, tet) = k + val oet = RevNameMap.getOrElse(et, et) + val otet = tet.map(tet => RevNameMap.getOrElse(tet, tet)) + val ok = (oet, otet) + val oc = fromDist.getOrElse(ok, -1) + val checkMatch = (c == oc) + if (!checkMatch) { + logger.info(s"${k} doesn't match: new has ${c}. old has ${oc}.") + } + checkMatch + }} + + if (!fromGood || !toGood) { + logger.error("Doesn't match!! There is an import error.") + } else { + logger.info("Count matches. Looks like we are good to go.") + } + } + + /* For upgrade from 0.8.2 to 0.8.3 only */ + def upgrade(fromAppId: Int, toAppId: Int) { + + val eventClient = Storage.getLEvents().asInstanceOf[HBLEvents] + + require(fromAppId != toAppId, + s"FromAppId: $fromAppId must be different from toAppId: $toAppId") + + if (hasPIOPrefix(eventClient, fromAppId)) { + require( + isEmpty(eventClient, toAppId), + s"Target appId: $toAppId is not empty. Please run " + + "`pio app data-delete <app_name>` to clean the data before upgrading") + + logger.info(s"$fromAppId isEmpty: " + isEmpty(eventClient, fromAppId)) + + upgradeCopy(eventClient, fromAppId, toAppId) + + } else { + logger.info(s"From appId: ${fromAppId} doesn't contain" + + s" obsolete entityTypes ${obsEntityTypes} or" + + s" obsolete properties ${obsProperties}." + + " No need data migration." + + s" You can continue to use appId ${fromAppId}.") + } + + logger.info("Done.") + } + + +}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala ---------------------------------------------------------------------- diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala new file mode 100644 index 0000000..b453820 --- /dev/null +++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.view + +import org.apache.predictionio.data.storage.hbase.HBPEvents +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.EventValidation +import org.apache.predictionio.data.storage.DataMap +import org.apache.predictionio.data.storage.Storage + +import org.joda.time.DateTime + +import org.json4s.JValue + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + + +// each JValue data associated with the time it is set +private[predictionio] case class PropTime(val d: JValue, val t: Long) extends Serializable + +private[predictionio] case class SetProp ( + val fields: Map[String, PropTime], + // last set time. Note: fields could be empty with valid set time + val t: Long) extends Serializable { + + def ++ (that: SetProp): SetProp = { + val commonKeys = fields.keySet.intersect(that.fields.keySet) + + val common: Map[String, PropTime] = commonKeys.map { k => + val thisData = this.fields(k) + val thatData = that.fields(k) + // only keep the value with latest time + val v = if (thisData.t > thatData.t) thisData else thatData + (k, v) + }.toMap + + val combinedFields = common ++ + (this.fields -- commonKeys) ++ (that.fields -- commonKeys) + + // keep the latest set time + val combinedT = if (this.t > that.t) this.t else that.t + + SetProp( + fields = combinedFields, + t = combinedT + ) + } +} + +private[predictionio] case class UnsetProp (fields: Map[String, Long]) extends Serializable { + def ++ (that: UnsetProp): UnsetProp = { + val commonKeys = fields.keySet.intersect(that.fields.keySet) + + val common: Map[String, Long] = commonKeys.map { k => + val thisData = this.fields(k) + val thatData = that.fields(k) + // only keep the value with latest time + val v = if (thisData > thatData) thisData else thatData + (k, v) + }.toMap + + val combinedFields = common ++ + (this.fields -- commonKeys) ++ (that.fields -- commonKeys) + + UnsetProp( + fields = combinedFields + ) + } +} + +private[predictionio] case class DeleteEntity (t: Long) extends Serializable { + def ++ (that: DeleteEntity): DeleteEntity = { + if (this.t > that.t) this else that + } +} + +private[predictionio] case class EventOp ( + val setProp: Option[SetProp] = None, + val unsetProp: Option[UnsetProp] = None, + val deleteEntity: Option[DeleteEntity] = None +) extends Serializable { + + def ++ (that: EventOp): EventOp = { + EventOp( + setProp = (setProp ++ that.setProp).reduceOption(_ ++ _), + unsetProp = (unsetProp ++ that.unsetProp).reduceOption(_ ++ _), + deleteEntity = (deleteEntity ++ that.deleteEntity).reduceOption(_ ++ _) + ) + } + + def toDataMap(): Option[DataMap] = { + setProp.flatMap { set => + + val unsetKeys: Set[String] = unsetProp.map( unset => + unset.fields.filter{ case (k, v) => (v >= set.fields(k).t) }.keySet + ).getOrElse(Set()) + + val combinedFields = deleteEntity.map { delete => + if (delete.t >= set.t) { + None + } else { + val deleteKeys: Set[String] = set.fields + .filter { case (k, PropTime(kv, t)) => + (delete.t >= t) + }.keySet + Some(set.fields -- unsetKeys -- deleteKeys) + } + }.getOrElse{ + Some(set.fields -- unsetKeys) + } + + // Note: mapValues() doesn't return concrete Map and causes + // NotSerializableException issue. Use map(identity) to work around this. + // see https://issues.scala-lang.org/browse/SI-7005 + combinedFields.map(f => DataMap(f.mapValues(_.d).map(identity))) + } + } + +} + +private[predictionio] object EventOp { + def apply(e: Event): EventOp = { + val t = e.eventTime.getMillis + e.event match { + case "$set" => { + val fields = e.properties.fields.mapValues(jv => + PropTime(jv, t) + ).map(identity) + + EventOp( + setProp = Some(SetProp(fields = fields, t = t)) + ) + } + case "$unset" => { + val fields = e.properties.fields.mapValues(jv => t).map(identity) + EventOp( + unsetProp = Some(UnsetProp(fields = fields)) + ) + } + case "$delete" => { + EventOp( + deleteEntity = Some(DeleteEntity(t)) + ) + } + case _ => { + EventOp() + } + } + } +} + +@deprecated("Use PEvents or PEventStore instead.", "0.9.2") +class PBatchView( + val appId: Int, + val startTime: Option[DateTime], + val untilTime: Option[DateTime], + val sc: SparkContext) { + + // NOTE: parallel Events DB interface + @transient lazy val eventsDb = Storage.getPEvents() + + @transient lazy val _events: RDD[Event] = + eventsDb.getByAppIdAndTimeAndEntity( + appId = appId, + startTime = startTime, + untilTime = untilTime, + entityType = None, + entityId = None)(sc) + + // TODO: change to use EventSeq? + @transient lazy val events: RDD[Event] = _events + + def aggregateProperties( + entityType: String, + startTimeOpt: Option[DateTime] = None, + untilTimeOpt: Option[DateTime] = None + ): RDD[(String, DataMap)] = { + + _events + .filter( e => ((e.entityType == entityType) && + (EventValidation.isSpecialEvents(e.event))) ) + .map( e => (e.entityId, EventOp(e) )) + .aggregateByKey[EventOp](EventOp())( + // within same partition + seqOp = { case (u, v) => u ++ v }, + // across partition + combOp = { case (accu, u) => accu ++ u } + ) + .mapValues(_.toDataMap) + .filter{ case (k, v) => v.isDefined } + .map{ case (k, v) => (k, v.get) } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hbase/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/storage/hbase/src/test/resources/application.conf b/storage/hbase/src/test/resources/application.conf new file mode 100644 index 0000000..eecae44 --- /dev/null +++ b/storage/hbase/src/test/resources/application.conf @@ -0,0 +1,28 @@ +org.apache.predictionio.data.storage { + sources { + mongodb { + type = mongodb + hosts = [localhost] + ports = [27017] + } + elasticsearch { + type = elasticsearch + hosts = [localhost] + ports = [9300] + } + } + repositories { + # This section is dummy just to make storage happy. + # The actual testing will not bypass these repository settings completely. + # Please refer to StorageTestUtils.scala. + settings { + name = "test_predictionio" + source = mongodb + } + + appdata { + name = "test_predictionio_appdata" + source = mongodb + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hdfs/.gitignore ---------------------------------------------------------------------- diff --git a/storage/hdfs/.gitignore b/storage/hdfs/.gitignore new file mode 100644 index 0000000..ae3c172 --- /dev/null +++ b/storage/hdfs/.gitignore @@ -0,0 +1 @@ +/bin/ http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hdfs/build.sbt ---------------------------------------------------------------------- diff --git a/storage/hdfs/build.sbt b/storage/hdfs/build.sbt new file mode 100644 index 0000000..9f064c6 --- /dev/null +++ b/storage/hdfs/build.sbt @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +name := "apache-predictionio-data-hdfs" + +libraryDependencies ++= Seq( + "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided", + "org.apache.predictionio" %% "apache-predictionio-data" % version.value % "provided", + "org.scalatest" %% "scalatest" % "2.1.7" % "test", + "org.specs2" %% "specs2" % "2.3.13" % "test") + +parallelExecution in Test := false + +pomExtra := childrenPomExtra.value + +assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false, includeDependency = true) + +assemblyMergeStrategy in assembly := { + case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat + case PathList("META-INF", "NOTICE.txt") => MergeStrategy.concat + case x => + val oldStrategy = (assemblyMergeStrategy in assembly).value + oldStrategy(x) +} + +// skip test in assembly +test in assembly := {} + +outputPath in assembly := baseDirectory.value.getAbsoluteFile.getParentFile.getParentFile / "assembly" / "spark" / ("pio-data-hdfs-assembly-" + version.value + ".jar") + http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/HDFSModels.scala ---------------------------------------------------------------------- diff --git a/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/HDFSModels.scala b/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/HDFSModels.scala new file mode 100644 index 0000000..08dfb01 --- /dev/null +++ b/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/HDFSModels.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.hdfs + +import java.io.IOException + +import com.google.common.io.ByteStreams +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.Model +import org.apache.predictionio.data.storage.Models +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path + +class HDFSModels(fs: FileSystem, config: StorageClientConfig, prefix: String) + extends Models with Logging { + + def insert(i: Model): Unit = { + try { + val fsdos = fs.create(new Path(s"$prefix${i.id}")) + fsdos.write(i.models) + fsdos.close + } catch { + case e: IOException => error(e.getMessage) + } + } + + def get(id: String): Option[Model] = { + try { + val p = new Path(s"$prefix$id") + Some(Model( + id = id, + models = ByteStreams.toByteArray(fs.open(p)))) + } catch { + case e: Throwable => + error(e.getMessage) + None + } + } + + def delete(id: String): Unit = { + val p = new Path(s"$prefix$id") + if (!fs.delete(p, false)) { + error(s"Unable to delete ${fs.makeQualified(p).toString}!") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/StorageClient.scala ---------------------------------------------------------------------- diff --git a/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/StorageClient.scala b/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/StorageClient.scala new file mode 100644 index 0000000..bc57f2a --- /dev/null +++ b/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/StorageClient.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.hdfs + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.BaseStorageClient +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path + +class StorageClient(val config: StorageClientConfig) extends BaseStorageClient + with Logging { + override val prefix = "HDFS" + val conf = new Configuration + val fs = FileSystem.get(conf) + fs.setWorkingDirectory( + new Path(config.properties.getOrElse("PATH", config.properties("HOSTS")))) + val client = fs +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/package.scala ---------------------------------------------------------------------- diff --git a/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/package.scala b/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/package.scala new file mode 100644 index 0000000..a927d78 --- /dev/null +++ b/storage/hdfs/src/main/scala/org/apache/predictionio/data/storage/hdfs/package.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage + +/** HDFS implementation of storage traits, supporting model data only + * + * @group Implementation + */ +package object hdfs {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/hdfs/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/storage/hdfs/src/test/resources/application.conf b/storage/hdfs/src/test/resources/application.conf new file mode 100644 index 0000000..eecae44 --- /dev/null +++ b/storage/hdfs/src/test/resources/application.conf @@ -0,0 +1,28 @@ +org.apache.predictionio.data.storage { + sources { + mongodb { + type = mongodb + hosts = [localhost] + ports = [27017] + } + elasticsearch { + type = elasticsearch + hosts = [localhost] + ports = [9300] + } + } + repositories { + # This section is dummy just to make storage happy. + # The actual testing will not bypass these repository settings completely. + # Please refer to StorageTestUtils.scala. + settings { + name = "test_predictionio" + source = mongodb + } + + appdata { + name = "test_predictionio_appdata" + source = mongodb + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/.gitignore ---------------------------------------------------------------------- diff --git a/storage/jdbc/.gitignore b/storage/jdbc/.gitignore new file mode 100644 index 0000000..ae3c172 --- /dev/null +++ b/storage/jdbc/.gitignore @@ -0,0 +1 @@ +/bin/ http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/build.sbt ---------------------------------------------------------------------- diff --git a/storage/jdbc/build.sbt b/storage/jdbc/build.sbt new file mode 100644 index 0000000..63d420b --- /dev/null +++ b/storage/jdbc/build.sbt @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +name := "apache-predictionio-data-jdbc" + +libraryDependencies ++= Seq( + "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided", + "org.apache.predictionio" %% "apache-predictionio-data" % version.value % "provided", + "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", + "org.postgresql" % "postgresql" % "9.4-1204-jdbc41", + "org.scalikejdbc" %% "scalikejdbc" % "2.3.5", + "org.scalatest" %% "scalatest" % "2.1.7" % "test", + "org.specs2" %% "specs2" % "2.3.13" % "test") + +parallelExecution in Test := false + +pomExtra := childrenPomExtra.value + +assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false, includeDependency = true) + +assemblyMergeStrategy in assembly := { + case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat + case PathList("META-INF", "NOTICE.txt") => MergeStrategy.concat + case x => + val oldStrategy = (assemblyMergeStrategy in assembly).value + oldStrategy(x) +} + +// skip test in assembly +test in assembly := {} + +outputPath in assembly := baseDirectory.value.getAbsoluteFile.getParentFile.getParentFile / "assembly" / "spark" / ("pio-data-jdbc-assembly-" + version.value + ".jar") + http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCAccessKeys.scala ---------------------------------------------------------------------- diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCAccessKeys.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCAccessKeys.scala new file mode 100644 index 0000000..437f8ae --- /dev/null +++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCAccessKeys.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.jdbc + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.AccessKey +import org.apache.predictionio.data.storage.AccessKeys +import org.apache.predictionio.data.storage.StorageClientConfig +import scalikejdbc._ + +import scala.util.Random + +/** JDBC implementation of [[AccessKeys]] */ +class JDBCAccessKeys(client: String, config: StorageClientConfig, prefix: String) + extends AccessKeys with Logging { + /** Database table name for this data access object */ + val tableName = JDBCUtils.prefixTableName(prefix, "accesskeys") + DB autoCommit { implicit session => + sql""" + create table if not exists $tableName ( + accesskey varchar(64) not null primary key, + appid integer not null, + events text)""".execute().apply() + } + + def insert(accessKey: AccessKey): Option[String] = DB localTx { implicit s => + val key = if (accessKey.key.isEmpty) generateKey else accessKey.key + val events = if (accessKey.events.isEmpty) None else Some(accessKey.events.mkString(",")) + sql""" + insert into $tableName values( + $key, + ${accessKey.appid}, + $events)""".update().apply() + Some(key) + } + + def get(key: String): Option[AccessKey] = DB readOnly { implicit session => + sql"SELECT accesskey, appid, events FROM $tableName WHERE accesskey = $key". + map(resultToAccessKey).single().apply() + } + + def getAll(): Seq[AccessKey] = DB readOnly { implicit session => + sql"SELECT accesskey, appid, events FROM $tableName".map(resultToAccessKey).list().apply() + } + + def getByAppid(appid: Int): Seq[AccessKey] = DB readOnly { implicit session => + sql"SELECT accesskey, appid, events FROM $tableName WHERE appid = $appid". + map(resultToAccessKey).list().apply() + } + + def update(accessKey: AccessKey): Unit = DB localTx { implicit session => + val events = if (accessKey.events.isEmpty) None else Some(accessKey.events.mkString(",")) + sql""" + UPDATE $tableName SET + appid = ${accessKey.appid}, + events = $events + WHERE accesskey = ${accessKey.key}""".update().apply() + } + + def delete(key: String): Unit = DB localTx { implicit session => + sql"DELETE FROM $tableName WHERE accesskey = $key".update().apply() + } + + /** Convert JDBC results to [[AccessKey]] */ + def resultToAccessKey(rs: WrappedResultSet): AccessKey = { + AccessKey( + key = rs.string("accesskey"), + appid = rs.int("appid"), + events = rs.stringOpt("events").map(_.split(",").toSeq).getOrElse(Nil)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala ---------------------------------------------------------------------- diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala new file mode 100644 index 0000000..17e6410 --- /dev/null +++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.jdbc + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.App +import org.apache.predictionio.data.storage.Apps +import org.apache.predictionio.data.storage.StorageClientConfig +import scalikejdbc._ + +/** JDBC implementation of [[Apps]] */ +class JDBCApps(client: String, config: StorageClientConfig, prefix: String) + extends Apps with Logging { + /** Database table name for this data access object */ + val tableName = JDBCUtils.prefixTableName(prefix, "apps") + DB autoCommit { implicit session => + sql""" + create table if not exists $tableName ( + id serial not null primary key, + name text not null, + description text)""".execute.apply() + } + + def insert(app: App): Option[Int] = DB localTx { implicit session => + val q = if (app.id == 0) { + sql""" + insert into $tableName (name, description) values(${app.name}, ${app.description}) + """ + } else { + sql""" + insert into $tableName values(${app.id}, ${app.name}, ${app.description}) + """ + } + Some(q.updateAndReturnGeneratedKey().apply().toInt) + } + + def get(id: Int): Option[App] = DB readOnly { implicit session => + sql"SELECT id, name, description FROM $tableName WHERE id = ${id}".map(rs => + App( + id = rs.int("id"), + name = rs.string("name"), + description = rs.stringOpt("description")) + ).single().apply() + } + + def getByName(name: String): Option[App] = DB readOnly { implicit session => + sql"SELECT id, name, description FROM $tableName WHERE name = ${name}".map(rs => + App( + id = rs.int("id"), + name = rs.string("name"), + description = rs.stringOpt("description")) + ).single().apply() + } + + def getAll(): Seq[App] = DB readOnly { implicit session => + sql"SELECT id, name, description FROM $tableName".map(rs => + App( + id = rs.int("id"), + name = rs.string("name"), + description = rs.stringOpt("description")) + ).list().apply() + } + + def update(app: App): Unit = DB localTx { implicit session => + sql""" + update $tableName set name = ${app.name}, description = ${app.description} + where id = ${app.id}""".update().apply() + } + + def delete(id: Int): Unit = DB localTx { implicit session => + sql"DELETE FROM $tableName WHERE id = $id".update().apply() + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala ---------------------------------------------------------------------- diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala new file mode 100644 index 0000000..c9aaca5 --- /dev/null +++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.jdbc + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.Channel +import org.apache.predictionio.data.storage.Channels +import org.apache.predictionio.data.storage.StorageClientConfig +import scalikejdbc._ + +/** JDBC implementation of [[Channels]] */ +class JDBCChannels(client: String, config: StorageClientConfig, prefix: String) + extends Channels with Logging { + /** Database table name for this data access object */ + val tableName = JDBCUtils.prefixTableName(prefix, "channels") + DB autoCommit { implicit session => + sql""" + create table if not exists $tableName ( + id serial not null primary key, + name text not null, + appid integer not null)""".execute().apply() + } + + def insert(channel: Channel): Option[Int] = DB localTx { implicit session => + val q = if (channel.id == 0) { + sql"INSERT INTO $tableName (name, appid) VALUES(${channel.name}, ${channel.appid})" + } else { + sql"INSERT INTO $tableName VALUES(${channel.id}, ${channel.name}, ${channel.appid})" + } + Some(q.updateAndReturnGeneratedKey().apply().toInt) + } + + def get(id: Int): Option[Channel] = DB localTx { implicit session => + sql"SELECT id, name, appid FROM $tableName WHERE id = $id". + map(resultToChannel).single().apply() + } + + def getByAppid(appid: Int): Seq[Channel] = DB localTx { implicit session => + sql"SELECT id, name, appid FROM $tableName WHERE appid = $appid". + map(resultToChannel).list().apply() + } + + def delete(id: Int): Unit = DB localTx { implicit session => + sql"DELETE FROM $tableName WHERE id = $id".update().apply() + } + + def resultToChannel(rs: WrappedResultSet): Channel = { + Channel( + id = rs.int("id"), + name = rs.string("name"), + appid = rs.int("appid")) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala ---------------------------------------------------------------------- diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala new file mode 100644 index 0000000..13c374d --- /dev/null +++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.jdbc + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.EngineInstance +import org.apache.predictionio.data.storage.EngineInstances +import org.apache.predictionio.data.storage.StorageClientConfig +import scalikejdbc._ + +/** JDBC implementation of [[EngineInstances]] */ +class JDBCEngineInstances(client: String, config: StorageClientConfig, prefix: String) + extends EngineInstances with Logging { + /** Database table name for this data access object */ + val tableName = JDBCUtils.prefixTableName(prefix, "engineinstances") + DB autoCommit { implicit session => + sql""" + create table if not exists $tableName ( + id varchar(100) not null primary key, + status text not null, + startTime timestamp DEFAULT CURRENT_TIMESTAMP, + endTime timestamp DEFAULT CURRENT_TIMESTAMP, + engineId text not null, + engineVersion text not null, + engineVariant text not null, + engineFactory text not null, + batch text not null, + env text not null, + sparkConf text not null, + datasourceParams text not null, + preparatorParams text not null, + algorithmsParams text not null, + servingParams text not null)""".execute().apply() + } + + def insert(i: EngineInstance): String = DB localTx { implicit session => + val id = java.util.UUID.randomUUID().toString + sql""" + INSERT INTO $tableName VALUES( + $id, + ${i.status}, + ${i.startTime}, + ${i.endTime}, + ${i.engineId}, + ${i.engineVersion}, + ${i.engineVariant}, + ${i.engineFactory}, + ${i.batch}, + ${JDBCUtils.mapToString(i.env)}, + ${JDBCUtils.mapToString(i.sparkConf)}, + ${i.dataSourceParams}, + ${i.preparatorParams}, + ${i.algorithmsParams}, + ${i.servingParams})""".update().apply() + id + } + + def get(id: String): Option[EngineInstance] = DB localTx { implicit session => + sql""" + SELECT + id, + status, + startTime, + endTime, + engineId, + engineVersion, + engineVariant, + engineFactory, + batch, + env, + sparkConf, + datasourceParams, + preparatorParams, + algorithmsParams, + servingParams + FROM $tableName WHERE id = $id""".map(resultToEngineInstance). + single().apply() + } + + def getAll(): Seq[EngineInstance] = DB localTx { implicit session => + sql""" + SELECT + id, + status, + startTime, + endTime, + engineId, + engineVersion, + engineVariant, + engineFactory, + batch, + env, + sparkConf, + datasourceParams, + preparatorParams, + algorithmsParams, + servingParams + FROM $tableName""".map(resultToEngineInstance).list().apply() + } + + def getLatestCompleted( + engineId: String, + engineVersion: String, + engineVariant: String): Option[EngineInstance] = + getCompleted(engineId, engineVersion, engineVariant).headOption + + def getCompleted( + engineId: String, + engineVersion: String, + engineVariant: String): Seq[EngineInstance] = DB localTx { implicit s => + sql""" + SELECT + id, + status, + startTime, + endTime, + engineId, + engineVersion, + engineVariant, + engineFactory, + batch, + env, + sparkConf, + datasourceParams, + preparatorParams, + algorithmsParams, + servingParams + FROM $tableName + WHERE + status = 'COMPLETED' AND + engineId = $engineId AND + engineVersion = $engineVersion AND + engineVariant = $engineVariant + ORDER BY startTime DESC""". + map(resultToEngineInstance).list().apply() + } + + def update(i: EngineInstance): Unit = DB localTx { implicit session => + sql""" + update $tableName set + status = ${i.status}, + startTime = ${i.startTime}, + endTime = ${i.endTime}, + engineId = ${i.engineId}, + engineVersion = ${i.engineVersion}, + engineVariant = ${i.engineVariant}, + engineFactory = ${i.engineFactory}, + batch = ${i.batch}, + env = ${JDBCUtils.mapToString(i.env)}, + sparkConf = ${JDBCUtils.mapToString(i.sparkConf)}, + datasourceParams = ${i.dataSourceParams}, + preparatorParams = ${i.preparatorParams}, + algorithmsParams = ${i.algorithmsParams}, + servingParams = ${i.servingParams} + where id = ${i.id}""".update().apply() + } + + def delete(id: String): Unit = DB localTx { implicit session => + sql"DELETE FROM $tableName WHERE id = $id".update().apply() + } + + /** Convert JDBC results to [[EngineInstance]] */ + def resultToEngineInstance(rs: WrappedResultSet): EngineInstance = { + EngineInstance( + id = rs.string("id"), + status = rs.string("status"), + startTime = rs.jodaDateTime("startTime"), + endTime = rs.jodaDateTime("endTime"), + engineId = rs.string("engineId"), + engineVersion = rs.string("engineVersion"), + engineVariant = rs.string("engineVariant"), + engineFactory = rs.string("engineFactory"), + batch = rs.string("batch"), + env = JDBCUtils.stringToMap(rs.string("env")), + sparkConf = JDBCUtils.stringToMap(rs.string("sparkConf")), + dataSourceParams = rs.string("datasourceParams"), + preparatorParams = rs.string("preparatorParams"), + algorithmsParams = rs.string("algorithmsParams"), + servingParams = rs.string("servingParams")) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala ---------------------------------------------------------------------- diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala new file mode 100644 index 0000000..90eb5f3 --- /dev/null +++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.jdbc + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.EvaluationInstance +import org.apache.predictionio.data.storage.EvaluationInstances +import org.apache.predictionio.data.storage.StorageClientConfig +import scalikejdbc._ + +/** JDBC implementations of [[EvaluationInstances]] */ +class JDBCEvaluationInstances(client: String, config: StorageClientConfig, prefix: String) + extends EvaluationInstances with Logging { + /** Database table name for this data access object */ + val tableName = JDBCUtils.prefixTableName(prefix, "evaluationinstances") + DB autoCommit { implicit session => + sql""" + create table if not exists $tableName ( + id varchar(100) not null primary key, + status text not null, + startTime timestamp DEFAULT CURRENT_TIMESTAMP, + endTime timestamp DEFAULT CURRENT_TIMESTAMP, + evaluationClass text not null, + engineParamsGeneratorClass text not null, + batch text not null, + env text not null, + sparkConf text not null, + evaluatorResults text not null, + evaluatorResultsHTML text not null, + evaluatorResultsJSON text)""".execute().apply() + } + + def insert(i: EvaluationInstance): String = DB localTx { implicit session => + val id = java.util.UUID.randomUUID().toString + sql""" + INSERT INTO $tableName VALUES( + $id, + ${i.status}, + ${i.startTime}, + ${i.endTime}, + ${i.evaluationClass}, + ${i.engineParamsGeneratorClass}, + ${i.batch}, + ${JDBCUtils.mapToString(i.env)}, + ${JDBCUtils.mapToString(i.sparkConf)}, + ${i.evaluatorResults}, + ${i.evaluatorResultsHTML}, + ${i.evaluatorResultsJSON})""".update().apply() + id + } + + def get(id: String): Option[EvaluationInstance] = DB localTx { implicit session => + sql""" + SELECT + id, + status, + startTime, + endTime, + evaluationClass, + engineParamsGeneratorClass, + batch, + env, + sparkConf, + evaluatorResults, + evaluatorResultsHTML, + evaluatorResultsJSON + FROM $tableName WHERE id = $id + """.map(resultToEvaluationInstance).single().apply() + } + + def getAll(): Seq[EvaluationInstance] = DB localTx { implicit session => + sql""" + SELECT + id, + status, + startTime, + endTime, + evaluationClass, + engineParamsGeneratorClass, + batch, + env, + sparkConf, + evaluatorResults, + evaluatorResultsHTML, + evaluatorResultsJSON + FROM $tableName + """.map(resultToEvaluationInstance).list().apply() + } + + def getCompleted(): Seq[EvaluationInstance] = DB localTx { implicit s => + sql""" + SELECT + id, + status, + startTime, + endTime, + evaluationClass, + engineParamsGeneratorClass, + batch, + env, + sparkConf, + evaluatorResults, + evaluatorResultsHTML, + evaluatorResultsJSON + FROM $tableName + WHERE + status = 'EVALCOMPLETED' + ORDER BY starttime DESC + """.map(resultToEvaluationInstance).list().apply() + } + + def update(i: EvaluationInstance): Unit = DB localTx { implicit session => + sql""" + update $tableName set + status = ${i.status}, + startTime = ${i.startTime}, + endTime = ${i.endTime}, + evaluationClass = ${i.evaluationClass}, + engineParamsGeneratorClass = ${i.engineParamsGeneratorClass}, + batch = ${i.batch}, + env = ${JDBCUtils.mapToString(i.env)}, + sparkConf = ${JDBCUtils.mapToString(i.sparkConf)}, + evaluatorResults = ${i.evaluatorResults}, + evaluatorResultsHTML = ${i.evaluatorResultsHTML}, + evaluatorResultsJSON = ${i.evaluatorResultsJSON} + where id = ${i.id}""".update().apply() + } + + def delete(id: String): Unit = DB localTx { implicit session => + sql"DELETE FROM $tableName WHERE id = $id".update().apply() + } + + /** Convert JDBC results to [[EvaluationInstance]] */ + def resultToEvaluationInstance(rs: WrappedResultSet): EvaluationInstance = { + EvaluationInstance( + id = rs.string("id"), + status = rs.string("status"), + startTime = rs.jodaDateTime("startTime"), + endTime = rs.jodaDateTime("endTime"), + evaluationClass = rs.string("evaluationClass"), + engineParamsGeneratorClass = rs.string("engineParamsGeneratorClass"), + batch = rs.string("batch"), + env = JDBCUtils.stringToMap(rs.string("env")), + sparkConf = JDBCUtils.stringToMap(rs.string("sparkConf")), + evaluatorResults = rs.string("evaluatorResults"), + evaluatorResultsHTML = rs.string("evaluatorResultsHTML"), + evaluatorResultsJSON = rs.string("evaluatorResultsJSON")) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala ---------------------------------------------------------------------- diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala new file mode 100644 index 0000000..dddef67 --- /dev/null +++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.jdbc + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.DataMap +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.LEvents +import org.apache.predictionio.data.storage.StorageClientConfig +import org.joda.time.DateTime +import org.joda.time.DateTimeZone +import org.json4s.JObject +import org.json4s.native.Serialization.read +import org.json4s.native.Serialization.write +import scalikejdbc._ + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future + +/** JDBC implementation of [[LEvents]] */ +class JDBCLEvents( + client: String, + config: StorageClientConfig, + namespace: String) extends LEvents with Logging { + implicit private val formats = org.json4s.DefaultFormats + + def init(appId: Int, channelId: Option[Int] = None): Boolean = { + + // To use index, it must be varchar less than 255 characters on a VARCHAR column + val useIndex = config.properties.contains("INDEX") && + config.properties("INDEX").equalsIgnoreCase("enabled") + + val tableName = JDBCUtils.eventTableName(namespace, appId, channelId) + val entityIdIndexName = s"idx_${tableName}_ei" + val entityTypeIndexName = s"idx_${tableName}_et" + DB autoCommit { implicit session => + if (useIndex) { + SQL(s""" + create table if not exists $tableName ( + id varchar(32) not null primary key, + event varchar(255) not null, + entityType varchar(255) not null, + entityId varchar(255) not null, + targetEntityType text, + targetEntityId text, + properties text, + eventTime timestamp DEFAULT CURRENT_TIMESTAMP, + eventTimeZone varchar(50) not null, + tags text, + prId text, + creationTime timestamp DEFAULT CURRENT_TIMESTAMP, + creationTimeZone varchar(50) not null)""").execute().apply() + + // create index + SQL(s"create index $entityIdIndexName on $tableName (entityId)").execute().apply() + SQL(s"create index $entityTypeIndexName on $tableName (entityType)").execute().apply() + } else { + SQL(s""" + create table if not exists $tableName ( + id varchar(32) not null primary key, + event text not null, + entityType text not null, + entityId text not null, + targetEntityType text, + targetEntityId text, + properties text, + eventTime timestamp DEFAULT CURRENT_TIMESTAMP, + eventTimeZone varchar(50) not null, + tags text, + prId text, + creationTime timestamp DEFAULT CURRENT_TIMESTAMP, + creationTimeZone varchar(50) not null)""").execute().apply() + } + true + } + } + + def remove(appId: Int, channelId: Option[Int] = None): Boolean = + DB autoCommit { implicit session => + SQL(s""" + drop table ${JDBCUtils.eventTableName(namespace, appId, channelId)} + """).execute().apply() + true + } + + def close(): Unit = ConnectionPool.closeAll() + + def futureInsert(event: Event, appId: Int, channelId: Option[Int])( + implicit ec: ExecutionContext): Future[String] = Future { + DB localTx { implicit session => + val id = event.eventId.getOrElse(JDBCUtils.generateId) + val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) + sql""" + insert into $tableName values( + $id, + ${event.event}, + ${event.entityType}, + ${event.entityId}, + ${event.targetEntityType}, + ${event.targetEntityId}, + ${write(event.properties.toJObject)}, + ${event.eventTime}, + ${event.eventTime.getZone.getID}, + ${if (event.tags.nonEmpty) Some(event.tags.mkString(",")) else None}, + ${event.prId}, + ${event.creationTime}, + ${event.creationTime.getZone.getID} + ) + """.update().apply() + id + } + } + + def futureGet(eventId: String, appId: Int, channelId: Option[Int])( + implicit ec: ExecutionContext): Future[Option[Event]] = Future { + DB readOnly { implicit session => + val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) + sql""" + select + id, + event, + entityType, + entityId, + targetEntityType, + targetEntityId, + properties, + eventTime, + eventTimeZone, + tags, + prId, + creationTime, + creationTimeZone + from $tableName + where id = $eventId + """.map(resultToEvent).single().apply() + } + } + + def futureDelete(eventId: String, appId: Int, channelId: Option[Int])( + implicit ec: ExecutionContext): Future[Boolean] = Future { + DB localTx { implicit session => + val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) + sql""" + delete from $tableName where id = $eventId + """.update().apply() + true + } + } + + def futureFind( + appId: Int, + channelId: Option[Int] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + entityType: Option[String] = None, + entityId: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None, + limit: Option[Int] = None, + reversed: Option[Boolean] = None + )(implicit ec: ExecutionContext): Future[Iterator[Event]] = Future { + DB readOnly { implicit session => + val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) + val whereClause = sqls.toAndConditionOpt( + startTime.map(x => sqls"eventTime >= $x"), + untilTime.map(x => sqls"eventTime < $x"), + entityType.map(x => sqls"entityType = $x"), + entityId.map(x => sqls"entityId = $x"), + eventNames.map(x => + sqls.toOrConditionOpt(x.map(y => + Some(sqls"event = $y") + ): _*) + ).getOrElse(None), + targetEntityType.map(x => x.map(y => sqls"targetEntityType = $y") + .getOrElse(sqls"targetEntityType IS NULL")), + targetEntityId.map(x => x.map(y => sqls"targetEntityId = $y") + .getOrElse(sqls"targetEntityId IS NULL")) + ).map(sqls.where(_)).getOrElse(sqls"") + val orderByClause = reversed.map(x => + if (x) sqls"eventTime desc" else sqls"eventTime asc" + ).getOrElse(sqls"eventTime asc") + val limitClause = limit.map(x => + if (x < 0) sqls"" else sqls.limit(x) + ).getOrElse(sqls"") + val q = sql""" + select + id, + event, + entityType, + entityId, + targetEntityType, + targetEntityId, + properties, + eventTime, + eventTimeZone, + tags, + prId, + creationTime, + creationTimeZone + from $tableName + $whereClause + order by $orderByClause + $limitClause + """ + q.map(resultToEvent).list().apply().toIterator + } + } + + private[predictionio] def resultToEvent(rs: WrappedResultSet): Event = { + Event( + eventId = rs.stringOpt("id"), + event = rs.string("event"), + entityType = rs.string("entityType"), + entityId = rs.string("entityId"), + targetEntityType = rs.stringOpt("targetEntityType"), + targetEntityId = rs.stringOpt("targetEntityId"), + properties = rs.stringOpt("properties").map(p => + DataMap(read[JObject](p))).getOrElse(DataMap()), + eventTime = new DateTime(rs.jodaDateTime("eventTime"), + DateTimeZone.forID(rs.string("eventTimeZone"))), + tags = rs.stringOpt("tags").map(t => t.split(",").toList).getOrElse(Nil), + prId = rs.stringOpt("prId"), + creationTime = new DateTime(rs.jodaDateTime("creationTime"), + DateTimeZone.forID(rs.string("creationTimeZone"))) + ) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala ---------------------------------------------------------------------- diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala new file mode 100644 index 0000000..b48502a --- /dev/null +++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.jdbc + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.Model +import org.apache.predictionio.data.storage.Models +import org.apache.predictionio.data.storage.StorageClientConfig +import scalikejdbc._ + +/** JDBC implementation of [[Models]] */ +class JDBCModels(client: String, config: StorageClientConfig, prefix: String) + extends Models with Logging { + /** Database table name for this data access object */ + val tableName = JDBCUtils.prefixTableName(prefix, "models") + + /** Determines binary column type based on JDBC driver type */ + val binaryColumnType = JDBCUtils.binaryColumnType(client) + DB autoCommit { implicit session => + sql""" + create table if not exists $tableName ( + id varchar(100) not null primary key, + models $binaryColumnType not null)""".execute().apply() + } + + def insert(i: Model): Unit = DB localTx { implicit session => + sql"insert into $tableName values(${i.id}, ${i.models})".update().apply() + } + + def get(id: String): Option[Model] = DB readOnly { implicit session => + sql"select id, models from $tableName where id = $id".map { r => + Model(id = r.string("id"), models = r.bytes("models")) + }.single().apply() + } + + def delete(id: String): Unit = DB localTx { implicit session => + sql"delete from $tableName where id = $id".execute().apply() + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala ---------------------------------------------------------------------- diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala new file mode 100644 index 0000000..2e6ee83 --- /dev/null +++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.jdbc + +import java.sql.{DriverManager, ResultSet} + +import com.github.nscala_time.time.Imports._ +import org.apache.predictionio.data.storage.{DataMap, Event, PEvents, StorageClientConfig} +import org.apache.spark.SparkContext +import org.apache.spark.rdd.{JdbcRDD, RDD} +import org.apache.spark.sql.{SQLContext, SaveMode} +import org.json4s.JObject +import org.json4s.native.Serialization +import scalikejdbc._ + +/** JDBC implementation of [[PEvents]] */ +class JDBCPEvents(client: String, config: StorageClientConfig, namespace: String) extends PEvents { + @transient private implicit lazy val formats = org.json4s.DefaultFormats + def find( + appId: Int, + channelId: Option[Int] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + entityType: Option[String] = None, + entityId: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None)(sc: SparkContext): RDD[Event] = { + val lower = startTime.map(_.getMillis).getOrElse(0.toLong) + /** Change the default upper bound from +100 to +1 year because MySQL's + * FROM_UNIXTIME(t) will return NULL if we use +100 years. + */ + val upper = untilTime.map(_.getMillis).getOrElse((DateTime.now + 1.years).getMillis) + val par = scala.math.min( + new Duration(upper - lower).getStandardDays, + config.properties.getOrElse("PARTITIONS", "4").toLong).toInt + val entityTypeClause = entityType.map(x => s"and entityType = '$x'").getOrElse("") + val entityIdClause = entityId.map(x => s"and entityId = '$x'").getOrElse("") + val eventNamesClause = + eventNames.map("and (" + _.map(y => s"event = '$y'").mkString(" or ") + ")").getOrElse("") + val targetEntityTypeClause = targetEntityType.map( + _.map(x => s"and targetEntityType = '$x'" + ).getOrElse("and targetEntityType is null")).getOrElse("") + val targetEntityIdClause = targetEntityId.map( + _.map(x => s"and targetEntityId = '$x'" + ).getOrElse("and targetEntityId is null")).getOrElse("") + val q = s""" + select + id, + event, + entityType, + entityId, + targetEntityType, + targetEntityId, + properties, + eventTime, + eventTimeZone, + tags, + prId, + creationTime, + creationTimeZone + from ${JDBCUtils.eventTableName(namespace, appId, channelId)} + where + eventTime >= ${JDBCUtils.timestampFunction(client)}(?) and + eventTime < ${JDBCUtils.timestampFunction(client)}(?) + $entityTypeClause + $entityIdClause + $eventNamesClause + $targetEntityTypeClause + $targetEntityIdClause + """.replace("\n", " ") + new JdbcRDD( + sc, + () => { + DriverManager.getConnection( + client, + config.properties("USERNAME"), + config.properties("PASSWORD")) + }, + q, + lower / 1000, + upper / 1000, + par, + (r: ResultSet) => { + Event( + eventId = Option(r.getString("id")), + event = r.getString("event"), + entityType = r.getString("entityType"), + entityId = r.getString("entityId"), + targetEntityType = Option(r.getString("targetEntityType")), + targetEntityId = Option(r.getString("targetEntityId")), + properties = Option(r.getString("properties")).map(x => + DataMap(Serialization.read[JObject](x))).getOrElse(DataMap()), + eventTime = new DateTime(r.getTimestamp("eventTime").getTime, + DateTimeZone.forID(r.getString("eventTimeZone"))), + tags = Option(r.getString("tags")).map(x => + x.split(",").toList).getOrElse(Nil), + prId = Option(r.getString("prId")), + creationTime = new DateTime(r.getTimestamp("creationTime").getTime, + DateTimeZone.forID(r.getString("creationTimeZone")))) + }).cache() + } + + def write(events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { + val sqlContext = new SQLContext(sc) + + import sqlContext.implicits._ + + val tableName = JDBCUtils.eventTableName(namespace, appId, channelId) + + val eventTableColumns = Seq[String]( + "id" + , "event" + , "entityType" + , "entityId" + , "targetEntityType" + , "targetEntityId" + , "properties" + , "eventTime" + , "eventTimeZone" + , "tags" + , "prId" + , "creationTime" + , "creationTimeZone") + + val eventDF = events.map(x => + Event(eventId = None, event = x.event, entityType = x.entityType, + entityId = x.entityId, targetEntityType = x.targetEntityType, + targetEntityId = x.targetEntityId, properties = x.properties, + eventTime = x.eventTime, tags = x.tags, prId= x.prId, + creationTime = x.eventTime) + ) + .map { event => + (event.eventId.getOrElse(JDBCUtils.generateId) + , event.event + , event.entityType + , event.entityId + , event.targetEntityType.orNull + , event.targetEntityId.orNull + , if (!event.properties.isEmpty) Serialization.write(event.properties.toJObject) else null + , new java.sql.Timestamp(event.eventTime.getMillis) + , event.eventTime.getZone.getID + , if (event.tags.nonEmpty) Some(event.tags.mkString(",")) else null + , event.prId + , new java.sql.Timestamp(event.creationTime.getMillis) + , event.creationTime.getZone.getID) + }.toDF(eventTableColumns:_*) + + // spark version 1.4.0 or higher + val prop = new java.util.Properties + prop.setProperty("user", config.properties("USERNAME")) + prop.setProperty("password", config.properties("PASSWORD")) + eventDF.write.mode(SaveMode.Append).jdbc(client, tableName, prop) + } + + def delete(eventIds: RDD[String], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { + + eventIds.foreachPartition{ iter => + + iter.foreach { eventId => + DB localTx { implicit session => + val tableName = JDBCUtils.eventTableName(namespace, appId, channelId) + val table = SQLSyntax.createUnsafely(tableName) + sql""" + delete from $table where id = $eventId + """.update().apply() + true + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala ---------------------------------------------------------------------- diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala new file mode 100644 index 0000000..3eb55ba --- /dev/null +++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.jdbc + +import scalikejdbc._ + +/** JDBC related utilities */ +object JDBCUtils { + /** Extract JDBC driver type from URL + * + * @param url JDBC URL + * @return The driver type, e.g. postgresql + */ + def driverType(url: String): String = { + val capture = """jdbc:([^:]+):""".r + capture findFirstIn url match { + case Some(capture(driverType)) => driverType + case None => "" + } + } + + /** Determines binary column type from JDBC URL + * + * @param url JDBC URL + * @return Binary column type as SQLSyntax, e.g. LONGBLOB + */ + def binaryColumnType(url: String): SQLSyntax = { + driverType(url) match { + case "postgresql" => sqls"bytea" + case "mysql" => sqls"longblob" + case _ => sqls"longblob" + } + } + + /** Determines UNIX timestamp conversion function from JDBC URL + * + * @param url JDBC URL + * @return Timestamp conversion function, e.g. TO_TIMESTAMP + */ + def timestampFunction(url: String): String = { + driverType(url) match { + case "postgresql" => "to_timestamp" + case "mysql" => "from_unixtime" + case _ => "from_unixtime" + } + } + + /** Converts Map of String to String to comma-separated list of key=value + * + * @param m Map of String to String + * @return Comma-separated list, e.g. FOO=BAR,X=Y,... + */ + def mapToString(m: Map[String, String]): String = { + m.map(t => s"${t._1}=${t._2}").mkString(",") + } + + /** Inverse of mapToString + * + * @param str Comma-separated list, e.g. FOO=BAR,X=Y,... + * @return Map of String to String, e.g. Map("FOO" -> "BAR", "X" -> "Y", ...) + */ + def stringToMap(str: String): Map[String, String] = { + str.split(",").map { x => + val y = x.split("=") + y(0) -> y(1) + }.toMap[String, String] + } + + /** Generate 32-character random ID using UUID with - stripped */ + def generateId: String = java.util.UUID.randomUUID().toString.replace("-", "") + + /** Prefix a table name + * + * @param prefix Table prefix + * @param table Table name + * @return Prefixed table name + */ + def prefixTableName(prefix: String, table: String): SQLSyntax = + sqls.createUnsafely(s"${prefix}_$table") + + /** Derive event table name + * + * @param namespace Namespace of event tables + * @param appId App ID + * @param channelId Optional channel ID + * @return Full event table name + */ + def eventTableName(namespace: String, appId: Int, channelId: Option[Int]): String = + s"${namespace}_${appId}${channelId.map("_" + _).getOrElse("")}" +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala ---------------------------------------------------------------------- diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala new file mode 100644 index 0000000..661e05e --- /dev/null +++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.jdbc + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.BaseStorageClient +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.predictionio.data.storage.StorageClientException +import scalikejdbc._ + +/** JDBC implementation of [[BaseStorageClient]] */ +class StorageClient(val config: StorageClientConfig) + extends BaseStorageClient with Logging { + override val prefix = "JDBC" + + if (!config.properties.contains("URL")) { + throw new StorageClientException("The URL variable is not set!", null) + } + if (!config.properties.contains("USERNAME")) { + throw new StorageClientException("The USERNAME variable is not set!", null) + } + if (!config.properties.contains("PASSWORD")) { + throw new StorageClientException("The PASSWORD variable is not set!", null) + } + + // set max size of connection pool + val maxSize: Int = config.properties.getOrElse("CONNECTIONS", "8").toInt + val settings = ConnectionPoolSettings(maxSize = maxSize) + + ConnectionPool.singleton( + config.properties("URL"), + config.properties("USERNAME"), + config.properties("PASSWORD"), + settings) + /** JDBC connection URL. Connections are managed by ScalikeJDBC. */ + val client = config.properties("URL") +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala ---------------------------------------------------------------------- diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala new file mode 100644 index 0000000..e552e54 --- /dev/null +++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage + +/** JDBC implementation of storage traits, supporting meta data, event data, and + * model data + * + * @group Implementation + */ +package object jdbc {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/jdbc/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/storage/jdbc/src/test/resources/application.conf b/storage/jdbc/src/test/resources/application.conf new file mode 100644 index 0000000..eecae44 --- /dev/null +++ b/storage/jdbc/src/test/resources/application.conf @@ -0,0 +1,28 @@ +org.apache.predictionio.data.storage { + sources { + mongodb { + type = mongodb + hosts = [localhost] + ports = [27017] + } + elasticsearch { + type = elasticsearch + hosts = [localhost] + ports = [9300] + } + } + repositories { + # This section is dummy just to make storage happy. + # The actual testing will not bypass these repository settings completely. + # Please refer to StorageTestUtils.scala. + settings { + name = "test_predictionio" + source = mongodb + } + + appdata { + name = "test_predictionio_appdata" + source = mongodb + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/localfs/.gitignore ---------------------------------------------------------------------- diff --git a/storage/localfs/.gitignore b/storage/localfs/.gitignore new file mode 100644 index 0000000..ae3c172 --- /dev/null +++ b/storage/localfs/.gitignore @@ -0,0 +1 @@ +/bin/ http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/localfs/build.sbt ---------------------------------------------------------------------- diff --git a/storage/localfs/build.sbt b/storage/localfs/build.sbt new file mode 100644 index 0000000..2cf9977 --- /dev/null +++ b/storage/localfs/build.sbt @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +name := "apache-predictionio-data-localfs" + +libraryDependencies ++= Seq( + "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided", + "org.apache.predictionio" %% "apache-predictionio-data" % version.value % "provided", + "org.scalatest" %% "scalatest" % "2.1.7" % "test", + "org.specs2" %% "specs2" % "2.3.13" % "test") + +parallelExecution in Test := false + +pomExtra := childrenPomExtra.value + +assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false, includeDependency = true) + +assemblyMergeStrategy in assembly := { + case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat + case PathList("META-INF", "NOTICE.txt") => MergeStrategy.concat + case x => + val oldStrategy = (assemblyMergeStrategy in assembly).value + oldStrategy(x) +} + +// skip test in assembly +test in assembly := {} + +outputPath in assembly := baseDirectory.value.getAbsoluteFile.getParentFile.getParentFile / "assembly" / "spark" / ("pio-data-localfs-assembly-" + version.value + ".jar") + http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/storage/localfs/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala ---------------------------------------------------------------------- diff --git a/storage/localfs/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala b/storage/localfs/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala new file mode 100644 index 0000000..f528af9 --- /dev/null +++ b/storage/localfs/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage.localfs + +import java.io.File +import java.io.FileNotFoundException +import java.io.FileOutputStream + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.Model +import org.apache.predictionio.data.storage.Models +import org.apache.predictionio.data.storage.StorageClientConfig + +import scala.io.Source + +class LocalFSModels(f: File, config: StorageClientConfig, prefix: String) + extends Models with Logging { + + def insert(i: Model): Unit = { + try { + val fos = new FileOutputStream(new File(f, s"${prefix}${i.id}")) + fos.write(i.models) + fos.close + } catch { + case e: FileNotFoundException => error(e.getMessage) + } + } + + def get(id: String): Option[Model] = { + try { + Some(Model( + id = id, + models = Source.fromFile(new File(f, s"${prefix}${id}"))( + scala.io.Codec.ISO8859).map(_.toByte).toArray)) + } catch { + case e: Throwable => + error(e.getMessage) + None + } + } + + def delete(id: String): Unit = { + val m = new File(f, s"${prefix}${id}") + if (!m.delete) error(s"Unable to delete ${m.getCanonicalPath}!") + } +}
