http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala deleted file mode 100644 index cc07fa4..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.hbase.upgrade - -import org.apache.predictionio.annotation.Experimental - -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.EventValidation -import org.apache.predictionio.data.storage.DataMap - -import org.apache.hadoop.hbase.client.Scan -import org.apache.hadoop.hbase.client.HConnection -import org.apache.hadoop.hbase.client.Result -import org.apache.hadoop.hbase.TableName -import org.apache.hadoop.hbase.util.Bytes - -import org.joda.time.DateTime -import org.joda.time.DateTimeZone - -import org.json4s.DefaultFormats -import org.json4s.JObject -import org.json4s.native.Serialization.{ read, write } - -import org.apache.commons.codec.binary.Base64 - -import scala.collection.JavaConversions._ - -/** :: Experimental :: */ -@Experimental -object HB_0_8_0 { - - implicit val formats = DefaultFormats - - def getByAppId( - connection: HConnection, - namespace: String, - appId: Int): Iterator[Event] = { - val tableName = TableName.valueOf(namespace, "events") - val table = connection.getTable(tableName) - val start = PartialRowKey(appId) - val stop = PartialRowKey(appId + 1) - val scan = new Scan(start.toBytes, stop.toBytes) - val scanner = table.getScanner(scan) - table.close() - scanner.iterator().map { resultToEvent(_) } - } - - val colNames: Map[String, Array[Byte]] = Map( - "event" -> "e", - "entityType" -> "ety", - "entityId" -> "eid", - "targetEntityType" -> "tety", - "targetEntityId" -> "teid", - "properties" -> "p", - "prId" -> "pk", // columna name is 'pk' in 0.8.0/0.8.1 - "eventTimeZone" -> "etz", - "creationTimeZone" -> "ctz" - ).mapValues(Bytes.toBytes(_)) - - - class RowKey( - val appId: Int, - val millis: Long, - val uuidLow: Long - ) { - lazy val toBytes: Array[Byte] = { - // add UUID least significant bits for multiple actions at the same time - // (UUID's most significant bits are actually timestamp, - // use eventTime instead). - Bytes.toBytes(appId) ++ Bytes.toBytes(millis) ++ Bytes.toBytes(uuidLow) - } - override def toString: String = { - Base64.encodeBase64URLSafeString(toBytes) - } - } - - object RowKey { - // get RowKey from string representation - def apply(s: String): RowKey = { - try { - apply(Base64.decodeBase64(s)) - } catch { - case e: Exception => throw new RowKeyException( - s"Failed to convert String ${s} to RowKey because ${e}", e) - } - } - - def apply(b: Array[Byte]): RowKey = { - if (b.size != 20) { - val bString = b.mkString(",") - throw new RowKeyException( - s"Incorrect byte array size. Bytes: ${bString}.") - } - - new RowKey( - appId = Bytes.toInt(b.slice(0, 4)), - millis = Bytes.toLong(b.slice(4, 12)), - uuidLow = Bytes.toLong(b.slice(12, 20)) - ) - } - } - - class RowKeyException(msg: String, cause: Exception) - extends Exception(msg, cause) { - def this(msg: String) = this(msg, null) - } - - case class PartialRowKey(val appId: Int, val millis: Option[Long] = None) { - val toBytes: Array[Byte] = { - Bytes.toBytes(appId) ++ - (millis.map(Bytes.toBytes(_)).getOrElse(Array[Byte]())) - } - } - - def resultToEvent(result: Result): Event = { - val rowKey = RowKey(result.getRow()) - - val eBytes = Bytes.toBytes("e") - // val e = result.getFamilyMap(eBytes) - - def getStringCol(col: String): String = { - val r = result.getValue(eBytes, colNames(col)) - require(r != null, - s"Failed to get value for column ${col}. " + - s"Rowkey: ${rowKey.toString} " + - s"StringBinary: ${Bytes.toStringBinary(result.getRow())}.") - - Bytes.toString(r) - } - - def getOptStringCol(col: String): Option[String] = { - val r = result.getValue(eBytes, colNames(col)) - if (r == null) { - None - } else { - Some(Bytes.toString(r)) - } - } - - def getTimestamp(col: String): Long = { - result.getColumnLatestCell(eBytes, colNames(col)).getTimestamp() - } - - val event = getStringCol("event") - val entityType = getStringCol("entityType") - val entityId = getStringCol("entityId") - val targetEntityType = getOptStringCol("targetEntityType") - val targetEntityId = getOptStringCol("targetEntityId") - val properties: DataMap = getOptStringCol("properties") - .map(s => DataMap(read[JObject](s))).getOrElse(DataMap()) - val prId = getOptStringCol("prId") - val eventTimeZone = getOptStringCol("eventTimeZone") - .map(DateTimeZone.forID(_)) - .getOrElse(EventValidation.defaultTimeZone) - val creationTimeZone = getOptStringCol("creationTimeZone") - .map(DateTimeZone.forID(_)) - .getOrElse(EventValidation.defaultTimeZone) - - val creationTime: DateTime = new DateTime( - getTimestamp("event"), creationTimeZone - ) - - Event( - eventId = Some(RowKey(result.getRow()).toString), - event = event, - entityType = entityType, - entityId = entityId, - targetEntityType = targetEntityType, - targetEntityId = targetEntityId, - properties = properties, - eventTime = new DateTime(rowKey.millis, eventTimeZone), - tags = Seq(), - prId = prId, - creationTime = creationTime - ) - } -}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala deleted file mode 100644 index 1759561..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.hbase.upgrade - -import org.apache.predictionio.annotation.Experimental - -import org.apache.predictionio.data.storage.Storage -import org.apache.predictionio.data.storage.hbase.HBLEvents -import org.apache.predictionio.data.storage.hbase.HBEventsUtil - -import scala.collection.JavaConversions._ - -/** :: Experimental :: */ -@Experimental -object Upgrade { - - def main(args: Array[String]) { - val fromAppId = args(0).toInt - val toAppId = args(1).toInt - val batchSize = args.lift(2).map(_.toInt).getOrElse(100) - val fromNamespace = args.lift(3).getOrElse("predictionio_eventdata") - - upgrade(fromAppId, toAppId, batchSize, fromNamespace) - } - - /* For upgrade from 0.8.0 or 0.8.1 to 0.8.2 only */ - def upgrade( - fromAppId: Int, - toAppId: Int, - batchSize: Int, - fromNamespace: String) { - - val events = Storage.getLEvents().asInstanceOf[HBLEvents] - - // Assume already run "pio app new <newapp>" (new app already created) - // TODO: check if new table empty and warn user if not - val newTable = events.getTable(toAppId) - - val newTableName = newTable.getName().getNameAsString() - println(s"Copying data from ${fromNamespace}:events for app ID ${fromAppId}" - + s" to new HBase table ${newTableName}...") - - HB_0_8_0.getByAppId( - events.client.connection, - fromNamespace, - fromAppId).grouped(batchSize).foreach { eventGroup => - val puts = eventGroup.map{ e => - val (put, rowkey) = HBEventsUtil.eventToPut(e, toAppId) - put - } - newTable.put(puts.toList) - } - - newTable.flushCommits() - newTable.close() - println("Done.") - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala deleted file mode 100644 index de74d46..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/Upgrade_0_8_3.scala +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.hbase.upgrade - -import org.apache.predictionio.annotation.Experimental - -import grizzled.slf4j.Logger -import org.apache.predictionio.data.storage.Storage -import org.apache.predictionio.data.storage.DataMap -import org.apache.predictionio.data.storage.hbase.HBLEvents -import org.apache.predictionio.data.storage.hbase.HBEventsUtil - -import scala.collection.JavaConversions._ - -import scala.concurrent._ -import ExecutionContext.Implicits.global -import org.apache.predictionio.data.storage.LEvents -import scala.concurrent.Await -import scala.concurrent.duration.Duration -import java.lang.Thread - -object CheckDistribution { - def entityType(eventClient: LEvents, appId: Int) - : Map[(String, Option[String]), Int] = { - eventClient - .find(appId = appId) - .foldLeft(Map[(String, Option[String]), Int]().withDefaultValue(0)) { - case (m, e) => { - val k = (e.entityType, e.targetEntityType) - m.updated(k, m(k) + 1) - } - } - } - - def runMain(appId: Int) { - val eventClient = Storage.getLEvents().asInstanceOf[HBLEvents] - - entityType(eventClient, appId) - .toSeq - .sortBy(-_._2) - .foreach { println } - - } - - def main(args: Array[String]) { - runMain(args(0).toInt) - } - -} - -/** :: Experimental :: */ -@Experimental -object Upgrade_0_8_3 { - val NameMap = Map( - "pio_user" -> "user", - "pio_item" -> "item") - val RevNameMap = NameMap.toSeq.map(_.swap).toMap - - val logger = Logger[this.type] - - def main(args: Array[String]) { - val fromAppId = args(0).toInt - val toAppId = args(1).toInt - - runMain(fromAppId, toAppId) - } - - def runMain(fromAppId: Int, toAppId: Int): Unit = { - upgrade(fromAppId, toAppId) - } - - - val obsEntityTypes = Set("pio_user", "pio_item") - val obsProperties = Set( - "pio_itypes", "pio_starttime", "pio_endtime", - "pio_inactive", "pio_price", "pio_rating") - - def hasPIOPrefix(eventClient: LEvents, appId: Int): Boolean = { - eventClient.find(appId = appId).filter( e => - (obsEntityTypes.contains(e.entityType) || - e.targetEntityType.map(obsEntityTypes.contains(_)).getOrElse(false) || - (!e.properties.keySet.forall(!obsProperties.contains(_))) - ) - ).hasNext - } - - def isEmpty(eventClient: LEvents, appId: Int): Boolean = - !eventClient.find(appId = appId).hasNext - - - def upgradeCopy(eventClient: LEvents, fromAppId: Int, toAppId: Int) { - val fromDist = CheckDistribution.entityType(eventClient, fromAppId) - - logger.info("FromAppId Distribution") - fromDist.toSeq.sortBy(-_._2).foreach { e => logger.info(e) } - - val events = eventClient - .find(appId = fromAppId) - .zipWithIndex - .foreach { case (fromEvent, index) => { - if (index % 50000 == 0) { - // logger.info(s"Progress: $fromEvent $index") - logger.info(s"Progress: $index") - } - - - val fromEntityType = fromEvent.entityType - val toEntityType = NameMap.getOrElse(fromEntityType, fromEntityType) - - val fromTargetEntityType = fromEvent.targetEntityType - val toTargetEntityType = fromTargetEntityType - .map { et => NameMap.getOrElse(et, et) } - - val toProperties = DataMap(fromEvent.properties.fields.map { - case (k, v) => - val newK = if (obsProperties.contains(k)) { - val nK = k.stripPrefix("pio_") - logger.info(s"property ${k} will be renamed to ${nK}") - nK - } else k - (newK, v) - }) - - val toEvent = fromEvent.copy( - entityType = toEntityType, - targetEntityType = toTargetEntityType, - properties = toProperties) - - eventClient.insert(toEvent, toAppId) - }} - - - val toDist = CheckDistribution.entityType(eventClient, toAppId) - - logger.info("Recap fromAppId Distribution") - fromDist.toSeq.sortBy(-_._2).foreach { e => logger.info(e) } - - logger.info("ToAppId Distribution") - toDist.toSeq.sortBy(-_._2).foreach { e => logger.info(e) } - - val fromGood = fromDist - .toSeq - .forall { case (k, c) => { - val (et, tet) = k - val net = NameMap.getOrElse(et, et) - val ntet = tet.map(tet => NameMap.getOrElse(tet, tet)) - val nk = (net, ntet) - val nc = toDist.getOrElse(nk, -1) - val checkMatch = (c == nc) - if (!checkMatch) { - logger.info(s"${k} doesn't match: old has ${c}. new has ${nc}.") - } - checkMatch - }} - - val toGood = toDist - .toSeq - .forall { case (k, c) => { - val (et, tet) = k - val oet = RevNameMap.getOrElse(et, et) - val otet = tet.map(tet => RevNameMap.getOrElse(tet, tet)) - val ok = (oet, otet) - val oc = fromDist.getOrElse(ok, -1) - val checkMatch = (c == oc) - if (!checkMatch) { - logger.info(s"${k} doesn't match: new has ${c}. old has ${oc}.") - } - checkMatch - }} - - if (!fromGood || !toGood) { - logger.error("Doesn't match!! There is an import error.") - } else { - logger.info("Count matches. Looks like we are good to go.") - } - } - - /* For upgrade from 0.8.2 to 0.8.3 only */ - def upgrade(fromAppId: Int, toAppId: Int) { - - val eventClient = Storage.getLEvents().asInstanceOf[HBLEvents] - - require(fromAppId != toAppId, - s"FromAppId: $fromAppId must be different from toAppId: $toAppId") - - if (hasPIOPrefix(eventClient, fromAppId)) { - require( - isEmpty(eventClient, toAppId), - s"Target appId: $toAppId is not empty. Please run " + - "`pio app data-delete <app_name>` to clean the data before upgrading") - - logger.info(s"$fromAppId isEmpty: " + isEmpty(eventClient, fromAppId)) - - upgradeCopy(eventClient, fromAppId, toAppId) - - } else { - logger.info(s"From appId: ${fromAppId} doesn't contain" - + s" obsolete entityTypes ${obsEntityTypes} or" - + s" obsolete properties ${obsProperties}." - + " No need data migration." - + s" You can continue to use appId ${fromAppId}.") - } - - logger.info("Done.") - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/HDFSModels.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/HDFSModels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/HDFSModels.scala deleted file mode 100644 index 08dfb01..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/HDFSModels.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.hdfs - -import java.io.IOException - -import com.google.common.io.ByteStreams -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.Model -import org.apache.predictionio.data.storage.Models -import org.apache.predictionio.data.storage.StorageClientConfig -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.fs.Path - -class HDFSModels(fs: FileSystem, config: StorageClientConfig, prefix: String) - extends Models with Logging { - - def insert(i: Model): Unit = { - try { - val fsdos = fs.create(new Path(s"$prefix${i.id}")) - fsdos.write(i.models) - fsdos.close - } catch { - case e: IOException => error(e.getMessage) - } - } - - def get(id: String): Option[Model] = { - try { - val p = new Path(s"$prefix$id") - Some(Model( - id = id, - models = ByteStreams.toByteArray(fs.open(p)))) - } catch { - case e: Throwable => - error(e.getMessage) - None - } - } - - def delete(id: String): Unit = { - val p = new Path(s"$prefix$id") - if (!fs.delete(p, false)) { - error(s"Unable to delete ${fs.makeQualified(p).toString}!") - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/StorageClient.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/StorageClient.scala deleted file mode 100644 index bc57f2a..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/StorageClient.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.hdfs - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.BaseStorageClient -import org.apache.predictionio.data.storage.StorageClientConfig -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.fs.Path - -class StorageClient(val config: StorageClientConfig) extends BaseStorageClient - with Logging { - override val prefix = "HDFS" - val conf = new Configuration - val fs = FileSystem.get(conf) - fs.setWorkingDirectory( - new Path(config.properties.getOrElse("PATH", config.properties("HOSTS")))) - val client = fs -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/package.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/package.scala deleted file mode 100644 index a927d78..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/hdfs/package.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage - -/** HDFS implementation of storage traits, supporting model data only - * - * @group Implementation - */ -package object hdfs {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCAccessKeys.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCAccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCAccessKeys.scala deleted file mode 100644 index 437f8ae..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCAccessKeys.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.jdbc - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.AccessKey -import org.apache.predictionio.data.storage.AccessKeys -import org.apache.predictionio.data.storage.StorageClientConfig -import scalikejdbc._ - -import scala.util.Random - -/** JDBC implementation of [[AccessKeys]] */ -class JDBCAccessKeys(client: String, config: StorageClientConfig, prefix: String) - extends AccessKeys with Logging { - /** Database table name for this data access object */ - val tableName = JDBCUtils.prefixTableName(prefix, "accesskeys") - DB autoCommit { implicit session => - sql""" - create table if not exists $tableName ( - accesskey varchar(64) not null primary key, - appid integer not null, - events text)""".execute().apply() - } - - def insert(accessKey: AccessKey): Option[String] = DB localTx { implicit s => - val key = if (accessKey.key.isEmpty) generateKey else accessKey.key - val events = if (accessKey.events.isEmpty) None else Some(accessKey.events.mkString(",")) - sql""" - insert into $tableName values( - $key, - ${accessKey.appid}, - $events)""".update().apply() - Some(key) - } - - def get(key: String): Option[AccessKey] = DB readOnly { implicit session => - sql"SELECT accesskey, appid, events FROM $tableName WHERE accesskey = $key". - map(resultToAccessKey).single().apply() - } - - def getAll(): Seq[AccessKey] = DB readOnly { implicit session => - sql"SELECT accesskey, appid, events FROM $tableName".map(resultToAccessKey).list().apply() - } - - def getByAppid(appid: Int): Seq[AccessKey] = DB readOnly { implicit session => - sql"SELECT accesskey, appid, events FROM $tableName WHERE appid = $appid". - map(resultToAccessKey).list().apply() - } - - def update(accessKey: AccessKey): Unit = DB localTx { implicit session => - val events = if (accessKey.events.isEmpty) None else Some(accessKey.events.mkString(",")) - sql""" - UPDATE $tableName SET - appid = ${accessKey.appid}, - events = $events - WHERE accesskey = ${accessKey.key}""".update().apply() - } - - def delete(key: String): Unit = DB localTx { implicit session => - sql"DELETE FROM $tableName WHERE accesskey = $key".update().apply() - } - - /** Convert JDBC results to [[AccessKey]] */ - def resultToAccessKey(rs: WrappedResultSet): AccessKey = { - AccessKey( - key = rs.string("accesskey"), - appid = rs.int("appid"), - events = rs.stringOpt("events").map(_.split(",").toSeq).getOrElse(Nil)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala deleted file mode 100644 index 17e6410..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCApps.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.jdbc - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.App -import org.apache.predictionio.data.storage.Apps -import org.apache.predictionio.data.storage.StorageClientConfig -import scalikejdbc._ - -/** JDBC implementation of [[Apps]] */ -class JDBCApps(client: String, config: StorageClientConfig, prefix: String) - extends Apps with Logging { - /** Database table name for this data access object */ - val tableName = JDBCUtils.prefixTableName(prefix, "apps") - DB autoCommit { implicit session => - sql""" - create table if not exists $tableName ( - id serial not null primary key, - name text not null, - description text)""".execute.apply() - } - - def insert(app: App): Option[Int] = DB localTx { implicit session => - val q = if (app.id == 0) { - sql""" - insert into $tableName (name, description) values(${app.name}, ${app.description}) - """ - } else { - sql""" - insert into $tableName values(${app.id}, ${app.name}, ${app.description}) - """ - } - Some(q.updateAndReturnGeneratedKey().apply().toInt) - } - - def get(id: Int): Option[App] = DB readOnly { implicit session => - sql"SELECT id, name, description FROM $tableName WHERE id = ${id}".map(rs => - App( - id = rs.int("id"), - name = rs.string("name"), - description = rs.stringOpt("description")) - ).single().apply() - } - - def getByName(name: String): Option[App] = DB readOnly { implicit session => - sql"SELECT id, name, description FROM $tableName WHERE name = ${name}".map(rs => - App( - id = rs.int("id"), - name = rs.string("name"), - description = rs.stringOpt("description")) - ).single().apply() - } - - def getAll(): Seq[App] = DB readOnly { implicit session => - sql"SELECT id, name, description FROM $tableName".map(rs => - App( - id = rs.int("id"), - name = rs.string("name"), - description = rs.stringOpt("description")) - ).list().apply() - } - - def update(app: App): Unit = DB localTx { implicit session => - sql""" - update $tableName set name = ${app.name}, description = ${app.description} - where id = ${app.id}""".update().apply() - } - - def delete(id: Int): Unit = DB localTx { implicit session => - sql"DELETE FROM $tableName WHERE id = $id".update().apply() - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala deleted file mode 100644 index c9aaca5..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCChannels.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.jdbc - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.Channel -import org.apache.predictionio.data.storage.Channels -import org.apache.predictionio.data.storage.StorageClientConfig -import scalikejdbc._ - -/** JDBC implementation of [[Channels]] */ -class JDBCChannels(client: String, config: StorageClientConfig, prefix: String) - extends Channels with Logging { - /** Database table name for this data access object */ - val tableName = JDBCUtils.prefixTableName(prefix, "channels") - DB autoCommit { implicit session => - sql""" - create table if not exists $tableName ( - id serial not null primary key, - name text not null, - appid integer not null)""".execute().apply() - } - - def insert(channel: Channel): Option[Int] = DB localTx { implicit session => - val q = if (channel.id == 0) { - sql"INSERT INTO $tableName (name, appid) VALUES(${channel.name}, ${channel.appid})" - } else { - sql"INSERT INTO $tableName VALUES(${channel.id}, ${channel.name}, ${channel.appid})" - } - Some(q.updateAndReturnGeneratedKey().apply().toInt) - } - - def get(id: Int): Option[Channel] = DB localTx { implicit session => - sql"SELECT id, name, appid FROM $tableName WHERE id = $id". - map(resultToChannel).single().apply() - } - - def getByAppid(appid: Int): Seq[Channel] = DB localTx { implicit session => - sql"SELECT id, name, appid FROM $tableName WHERE appid = $appid". - map(resultToChannel).list().apply() - } - - def delete(id: Int): Unit = DB localTx { implicit session => - sql"DELETE FROM $tableName WHERE id = $id".update().apply() - } - - def resultToChannel(rs: WrappedResultSet): Channel = { - Channel( - id = rs.int("id"), - name = rs.string("name"), - appid = rs.int("appid")) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala deleted file mode 100644 index 13c374d..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEngineInstances.scala +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.jdbc - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.EngineInstance -import org.apache.predictionio.data.storage.EngineInstances -import org.apache.predictionio.data.storage.StorageClientConfig -import scalikejdbc._ - -/** JDBC implementation of [[EngineInstances]] */ -class JDBCEngineInstances(client: String, config: StorageClientConfig, prefix: String) - extends EngineInstances with Logging { - /** Database table name for this data access object */ - val tableName = JDBCUtils.prefixTableName(prefix, "engineinstances") - DB autoCommit { implicit session => - sql""" - create table if not exists $tableName ( - id varchar(100) not null primary key, - status text not null, - startTime timestamp DEFAULT CURRENT_TIMESTAMP, - endTime timestamp DEFAULT CURRENT_TIMESTAMP, - engineId text not null, - engineVersion text not null, - engineVariant text not null, - engineFactory text not null, - batch text not null, - env text not null, - sparkConf text not null, - datasourceParams text not null, - preparatorParams text not null, - algorithmsParams text not null, - servingParams text not null)""".execute().apply() - } - - def insert(i: EngineInstance): String = DB localTx { implicit session => - val id = java.util.UUID.randomUUID().toString - sql""" - INSERT INTO $tableName VALUES( - $id, - ${i.status}, - ${i.startTime}, - ${i.endTime}, - ${i.engineId}, - ${i.engineVersion}, - ${i.engineVariant}, - ${i.engineFactory}, - ${i.batch}, - ${JDBCUtils.mapToString(i.env)}, - ${JDBCUtils.mapToString(i.sparkConf)}, - ${i.dataSourceParams}, - ${i.preparatorParams}, - ${i.algorithmsParams}, - ${i.servingParams})""".update().apply() - id - } - - def get(id: String): Option[EngineInstance] = DB localTx { implicit session => - sql""" - SELECT - id, - status, - startTime, - endTime, - engineId, - engineVersion, - engineVariant, - engineFactory, - batch, - env, - sparkConf, - datasourceParams, - preparatorParams, - algorithmsParams, - servingParams - FROM $tableName WHERE id = $id""".map(resultToEngineInstance). - single().apply() - } - - def getAll(): Seq[EngineInstance] = DB localTx { implicit session => - sql""" - SELECT - id, - status, - startTime, - endTime, - engineId, - engineVersion, - engineVariant, - engineFactory, - batch, - env, - sparkConf, - datasourceParams, - preparatorParams, - algorithmsParams, - servingParams - FROM $tableName""".map(resultToEngineInstance).list().apply() - } - - def getLatestCompleted( - engineId: String, - engineVersion: String, - engineVariant: String): Option[EngineInstance] = - getCompleted(engineId, engineVersion, engineVariant).headOption - - def getCompleted( - engineId: String, - engineVersion: String, - engineVariant: String): Seq[EngineInstance] = DB localTx { implicit s => - sql""" - SELECT - id, - status, - startTime, - endTime, - engineId, - engineVersion, - engineVariant, - engineFactory, - batch, - env, - sparkConf, - datasourceParams, - preparatorParams, - algorithmsParams, - servingParams - FROM $tableName - WHERE - status = 'COMPLETED' AND - engineId = $engineId AND - engineVersion = $engineVersion AND - engineVariant = $engineVariant - ORDER BY startTime DESC""". - map(resultToEngineInstance).list().apply() - } - - def update(i: EngineInstance): Unit = DB localTx { implicit session => - sql""" - update $tableName set - status = ${i.status}, - startTime = ${i.startTime}, - endTime = ${i.endTime}, - engineId = ${i.engineId}, - engineVersion = ${i.engineVersion}, - engineVariant = ${i.engineVariant}, - engineFactory = ${i.engineFactory}, - batch = ${i.batch}, - env = ${JDBCUtils.mapToString(i.env)}, - sparkConf = ${JDBCUtils.mapToString(i.sparkConf)}, - datasourceParams = ${i.dataSourceParams}, - preparatorParams = ${i.preparatorParams}, - algorithmsParams = ${i.algorithmsParams}, - servingParams = ${i.servingParams} - where id = ${i.id}""".update().apply() - } - - def delete(id: String): Unit = DB localTx { implicit session => - sql"DELETE FROM $tableName WHERE id = $id".update().apply() - } - - /** Convert JDBC results to [[EngineInstance]] */ - def resultToEngineInstance(rs: WrappedResultSet): EngineInstance = { - EngineInstance( - id = rs.string("id"), - status = rs.string("status"), - startTime = rs.jodaDateTime("startTime"), - endTime = rs.jodaDateTime("endTime"), - engineId = rs.string("engineId"), - engineVersion = rs.string("engineVersion"), - engineVariant = rs.string("engineVariant"), - engineFactory = rs.string("engineFactory"), - batch = rs.string("batch"), - env = JDBCUtils.stringToMap(rs.string("env")), - sparkConf = JDBCUtils.stringToMap(rs.string("sparkConf")), - dataSourceParams = rs.string("datasourceParams"), - preparatorParams = rs.string("preparatorParams"), - algorithmsParams = rs.string("algorithmsParams"), - servingParams = rs.string("servingParams")) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala deleted file mode 100644 index 90eb5f3..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCEvaluationInstances.scala +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.jdbc - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.EvaluationInstance -import org.apache.predictionio.data.storage.EvaluationInstances -import org.apache.predictionio.data.storage.StorageClientConfig -import scalikejdbc._ - -/** JDBC implementations of [[EvaluationInstances]] */ -class JDBCEvaluationInstances(client: String, config: StorageClientConfig, prefix: String) - extends EvaluationInstances with Logging { - /** Database table name for this data access object */ - val tableName = JDBCUtils.prefixTableName(prefix, "evaluationinstances") - DB autoCommit { implicit session => - sql""" - create table if not exists $tableName ( - id varchar(100) not null primary key, - status text not null, - startTime timestamp DEFAULT CURRENT_TIMESTAMP, - endTime timestamp DEFAULT CURRENT_TIMESTAMP, - evaluationClass text not null, - engineParamsGeneratorClass text not null, - batch text not null, - env text not null, - sparkConf text not null, - evaluatorResults text not null, - evaluatorResultsHTML text not null, - evaluatorResultsJSON text)""".execute().apply() - } - - def insert(i: EvaluationInstance): String = DB localTx { implicit session => - val id = java.util.UUID.randomUUID().toString - sql""" - INSERT INTO $tableName VALUES( - $id, - ${i.status}, - ${i.startTime}, - ${i.endTime}, - ${i.evaluationClass}, - ${i.engineParamsGeneratorClass}, - ${i.batch}, - ${JDBCUtils.mapToString(i.env)}, - ${JDBCUtils.mapToString(i.sparkConf)}, - ${i.evaluatorResults}, - ${i.evaluatorResultsHTML}, - ${i.evaluatorResultsJSON})""".update().apply() - id - } - - def get(id: String): Option[EvaluationInstance] = DB localTx { implicit session => - sql""" - SELECT - id, - status, - startTime, - endTime, - evaluationClass, - engineParamsGeneratorClass, - batch, - env, - sparkConf, - evaluatorResults, - evaluatorResultsHTML, - evaluatorResultsJSON - FROM $tableName WHERE id = $id - """.map(resultToEvaluationInstance).single().apply() - } - - def getAll(): Seq[EvaluationInstance] = DB localTx { implicit session => - sql""" - SELECT - id, - status, - startTime, - endTime, - evaluationClass, - engineParamsGeneratorClass, - batch, - env, - sparkConf, - evaluatorResults, - evaluatorResultsHTML, - evaluatorResultsJSON - FROM $tableName - """.map(resultToEvaluationInstance).list().apply() - } - - def getCompleted(): Seq[EvaluationInstance] = DB localTx { implicit s => - sql""" - SELECT - id, - status, - startTime, - endTime, - evaluationClass, - engineParamsGeneratorClass, - batch, - env, - sparkConf, - evaluatorResults, - evaluatorResultsHTML, - evaluatorResultsJSON - FROM $tableName - WHERE - status = 'EVALCOMPLETED' - ORDER BY starttime DESC - """.map(resultToEvaluationInstance).list().apply() - } - - def update(i: EvaluationInstance): Unit = DB localTx { implicit session => - sql""" - update $tableName set - status = ${i.status}, - startTime = ${i.startTime}, - endTime = ${i.endTime}, - evaluationClass = ${i.evaluationClass}, - engineParamsGeneratorClass = ${i.engineParamsGeneratorClass}, - batch = ${i.batch}, - env = ${JDBCUtils.mapToString(i.env)}, - sparkConf = ${JDBCUtils.mapToString(i.sparkConf)}, - evaluatorResults = ${i.evaluatorResults}, - evaluatorResultsHTML = ${i.evaluatorResultsHTML}, - evaluatorResultsJSON = ${i.evaluatorResultsJSON} - where id = ${i.id}""".update().apply() - } - - def delete(id: String): Unit = DB localTx { implicit session => - sql"DELETE FROM $tableName WHERE id = $id".update().apply() - } - - /** Convert JDBC results to [[EvaluationInstance]] */ - def resultToEvaluationInstance(rs: WrappedResultSet): EvaluationInstance = { - EvaluationInstance( - id = rs.string("id"), - status = rs.string("status"), - startTime = rs.jodaDateTime("startTime"), - endTime = rs.jodaDateTime("endTime"), - evaluationClass = rs.string("evaluationClass"), - engineParamsGeneratorClass = rs.string("engineParamsGeneratorClass"), - batch = rs.string("batch"), - env = JDBCUtils.stringToMap(rs.string("env")), - sparkConf = JDBCUtils.stringToMap(rs.string("sparkConf")), - evaluatorResults = rs.string("evaluatorResults"), - evaluatorResultsHTML = rs.string("evaluatorResultsHTML"), - evaluatorResultsJSON = rs.string("evaluatorResultsJSON")) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala deleted file mode 100644 index dddef67..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.jdbc - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.DataMap -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.LEvents -import org.apache.predictionio.data.storage.StorageClientConfig -import org.joda.time.DateTime -import org.joda.time.DateTimeZone -import org.json4s.JObject -import org.json4s.native.Serialization.read -import org.json4s.native.Serialization.write -import scalikejdbc._ - -import scala.concurrent.ExecutionContext -import scala.concurrent.Future - -/** JDBC implementation of [[LEvents]] */ -class JDBCLEvents( - client: String, - config: StorageClientConfig, - namespace: String) extends LEvents with Logging { - implicit private val formats = org.json4s.DefaultFormats - - def init(appId: Int, channelId: Option[Int] = None): Boolean = { - - // To use index, it must be varchar less than 255 characters on a VARCHAR column - val useIndex = config.properties.contains("INDEX") && - config.properties("INDEX").equalsIgnoreCase("enabled") - - val tableName = JDBCUtils.eventTableName(namespace, appId, channelId) - val entityIdIndexName = s"idx_${tableName}_ei" - val entityTypeIndexName = s"idx_${tableName}_et" - DB autoCommit { implicit session => - if (useIndex) { - SQL(s""" - create table if not exists $tableName ( - id varchar(32) not null primary key, - event varchar(255) not null, - entityType varchar(255) not null, - entityId varchar(255) not null, - targetEntityType text, - targetEntityId text, - properties text, - eventTime timestamp DEFAULT CURRENT_TIMESTAMP, - eventTimeZone varchar(50) not null, - tags text, - prId text, - creationTime timestamp DEFAULT CURRENT_TIMESTAMP, - creationTimeZone varchar(50) not null)""").execute().apply() - - // create index - SQL(s"create index $entityIdIndexName on $tableName (entityId)").execute().apply() - SQL(s"create index $entityTypeIndexName on $tableName (entityType)").execute().apply() - } else { - SQL(s""" - create table if not exists $tableName ( - id varchar(32) not null primary key, - event text not null, - entityType text not null, - entityId text not null, - targetEntityType text, - targetEntityId text, - properties text, - eventTime timestamp DEFAULT CURRENT_TIMESTAMP, - eventTimeZone varchar(50) not null, - tags text, - prId text, - creationTime timestamp DEFAULT CURRENT_TIMESTAMP, - creationTimeZone varchar(50) not null)""").execute().apply() - } - true - } - } - - def remove(appId: Int, channelId: Option[Int] = None): Boolean = - DB autoCommit { implicit session => - SQL(s""" - drop table ${JDBCUtils.eventTableName(namespace, appId, channelId)} - """).execute().apply() - true - } - - def close(): Unit = ConnectionPool.closeAll() - - def futureInsert(event: Event, appId: Int, channelId: Option[Int])( - implicit ec: ExecutionContext): Future[String] = Future { - DB localTx { implicit session => - val id = event.eventId.getOrElse(JDBCUtils.generateId) - val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) - sql""" - insert into $tableName values( - $id, - ${event.event}, - ${event.entityType}, - ${event.entityId}, - ${event.targetEntityType}, - ${event.targetEntityId}, - ${write(event.properties.toJObject)}, - ${event.eventTime}, - ${event.eventTime.getZone.getID}, - ${if (event.tags.nonEmpty) Some(event.tags.mkString(",")) else None}, - ${event.prId}, - ${event.creationTime}, - ${event.creationTime.getZone.getID} - ) - """.update().apply() - id - } - } - - def futureGet(eventId: String, appId: Int, channelId: Option[Int])( - implicit ec: ExecutionContext): Future[Option[Event]] = Future { - DB readOnly { implicit session => - val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) - sql""" - select - id, - event, - entityType, - entityId, - targetEntityType, - targetEntityId, - properties, - eventTime, - eventTimeZone, - tags, - prId, - creationTime, - creationTimeZone - from $tableName - where id = $eventId - """.map(resultToEvent).single().apply() - } - } - - def futureDelete(eventId: String, appId: Int, channelId: Option[Int])( - implicit ec: ExecutionContext): Future[Boolean] = Future { - DB localTx { implicit session => - val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) - sql""" - delete from $tableName where id = $eventId - """.update().apply() - true - } - } - - def futureFind( - appId: Int, - channelId: Option[Int] = None, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - entityType: Option[String] = None, - entityId: Option[String] = None, - eventNames: Option[Seq[String]] = None, - targetEntityType: Option[Option[String]] = None, - targetEntityId: Option[Option[String]] = None, - limit: Option[Int] = None, - reversed: Option[Boolean] = None - )(implicit ec: ExecutionContext): Future[Iterator[Event]] = Future { - DB readOnly { implicit session => - val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) - val whereClause = sqls.toAndConditionOpt( - startTime.map(x => sqls"eventTime >= $x"), - untilTime.map(x => sqls"eventTime < $x"), - entityType.map(x => sqls"entityType = $x"), - entityId.map(x => sqls"entityId = $x"), - eventNames.map(x => - sqls.toOrConditionOpt(x.map(y => - Some(sqls"event = $y") - ): _*) - ).getOrElse(None), - targetEntityType.map(x => x.map(y => sqls"targetEntityType = $y") - .getOrElse(sqls"targetEntityType IS NULL")), - targetEntityId.map(x => x.map(y => sqls"targetEntityId = $y") - .getOrElse(sqls"targetEntityId IS NULL")) - ).map(sqls.where(_)).getOrElse(sqls"") - val orderByClause = reversed.map(x => - if (x) sqls"eventTime desc" else sqls"eventTime asc" - ).getOrElse(sqls"eventTime asc") - val limitClause = limit.map(x => - if (x < 0) sqls"" else sqls.limit(x) - ).getOrElse(sqls"") - val q = sql""" - select - id, - event, - entityType, - entityId, - targetEntityType, - targetEntityId, - properties, - eventTime, - eventTimeZone, - tags, - prId, - creationTime, - creationTimeZone - from $tableName - $whereClause - order by $orderByClause - $limitClause - """ - q.map(resultToEvent).list().apply().toIterator - } - } - - private[predictionio] def resultToEvent(rs: WrappedResultSet): Event = { - Event( - eventId = rs.stringOpt("id"), - event = rs.string("event"), - entityType = rs.string("entityType"), - entityId = rs.string("entityId"), - targetEntityType = rs.stringOpt("targetEntityType"), - targetEntityId = rs.stringOpt("targetEntityId"), - properties = rs.stringOpt("properties").map(p => - DataMap(read[JObject](p))).getOrElse(DataMap()), - eventTime = new DateTime(rs.jodaDateTime("eventTime"), - DateTimeZone.forID(rs.string("eventTimeZone"))), - tags = rs.stringOpt("tags").map(t => t.split(",").toList).getOrElse(Nil), - prId = rs.stringOpt("prId"), - creationTime = new DateTime(rs.jodaDateTime("creationTime"), - DateTimeZone.forID(rs.string("creationTimeZone"))) - ) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala deleted file mode 100644 index b48502a..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCModels.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.jdbc - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.Model -import org.apache.predictionio.data.storage.Models -import org.apache.predictionio.data.storage.StorageClientConfig -import scalikejdbc._ - -/** JDBC implementation of [[Models]] */ -class JDBCModels(client: String, config: StorageClientConfig, prefix: String) - extends Models with Logging { - /** Database table name for this data access object */ - val tableName = JDBCUtils.prefixTableName(prefix, "models") - - /** Determines binary column type based on JDBC driver type */ - val binaryColumnType = JDBCUtils.binaryColumnType(client) - DB autoCommit { implicit session => - sql""" - create table if not exists $tableName ( - id varchar(100) not null primary key, - models $binaryColumnType not null)""".execute().apply() - } - - def insert(i: Model): Unit = DB localTx { implicit session => - sql"insert into $tableName values(${i.id}, ${i.models})".update().apply() - } - - def get(id: String): Option[Model] = DB readOnly { implicit session => - sql"select id, models from $tableName where id = $id".map { r => - Model(id = r.string("id"), models = r.bytes("models")) - }.single().apply() - } - - def delete(id: String): Unit = DB localTx { implicit session => - sql"delete from $tableName where id = $id".execute().apply() - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala deleted file mode 100644 index 2e6ee83..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.jdbc - -import java.sql.{DriverManager, ResultSet} - -import com.github.nscala_time.time.Imports._ -import org.apache.predictionio.data.storage.{DataMap, Event, PEvents, StorageClientConfig} -import org.apache.spark.SparkContext -import org.apache.spark.rdd.{JdbcRDD, RDD} -import org.apache.spark.sql.{SQLContext, SaveMode} -import org.json4s.JObject -import org.json4s.native.Serialization -import scalikejdbc._ - -/** JDBC implementation of [[PEvents]] */ -class JDBCPEvents(client: String, config: StorageClientConfig, namespace: String) extends PEvents { - @transient private implicit lazy val formats = org.json4s.DefaultFormats - def find( - appId: Int, - channelId: Option[Int] = None, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - entityType: Option[String] = None, - entityId: Option[String] = None, - eventNames: Option[Seq[String]] = None, - targetEntityType: Option[Option[String]] = None, - targetEntityId: Option[Option[String]] = None)(sc: SparkContext): RDD[Event] = { - val lower = startTime.map(_.getMillis).getOrElse(0.toLong) - /** Change the default upper bound from +100 to +1 year because MySQL's - * FROM_UNIXTIME(t) will return NULL if we use +100 years. - */ - val upper = untilTime.map(_.getMillis).getOrElse((DateTime.now + 1.years).getMillis) - val par = scala.math.min( - new Duration(upper - lower).getStandardDays, - config.properties.getOrElse("PARTITIONS", "4").toLong).toInt - val entityTypeClause = entityType.map(x => s"and entityType = '$x'").getOrElse("") - val entityIdClause = entityId.map(x => s"and entityId = '$x'").getOrElse("") - val eventNamesClause = - eventNames.map("and (" + _.map(y => s"event = '$y'").mkString(" or ") + ")").getOrElse("") - val targetEntityTypeClause = targetEntityType.map( - _.map(x => s"and targetEntityType = '$x'" - ).getOrElse("and targetEntityType is null")).getOrElse("") - val targetEntityIdClause = targetEntityId.map( - _.map(x => s"and targetEntityId = '$x'" - ).getOrElse("and targetEntityId is null")).getOrElse("") - val q = s""" - select - id, - event, - entityType, - entityId, - targetEntityType, - targetEntityId, - properties, - eventTime, - eventTimeZone, - tags, - prId, - creationTime, - creationTimeZone - from ${JDBCUtils.eventTableName(namespace, appId, channelId)} - where - eventTime >= ${JDBCUtils.timestampFunction(client)}(?) and - eventTime < ${JDBCUtils.timestampFunction(client)}(?) - $entityTypeClause - $entityIdClause - $eventNamesClause - $targetEntityTypeClause - $targetEntityIdClause - """.replace("\n", " ") - new JdbcRDD( - sc, - () => { - DriverManager.getConnection( - client, - config.properties("USERNAME"), - config.properties("PASSWORD")) - }, - q, - lower / 1000, - upper / 1000, - par, - (r: ResultSet) => { - Event( - eventId = Option(r.getString("id")), - event = r.getString("event"), - entityType = r.getString("entityType"), - entityId = r.getString("entityId"), - targetEntityType = Option(r.getString("targetEntityType")), - targetEntityId = Option(r.getString("targetEntityId")), - properties = Option(r.getString("properties")).map(x => - DataMap(Serialization.read[JObject](x))).getOrElse(DataMap()), - eventTime = new DateTime(r.getTimestamp("eventTime").getTime, - DateTimeZone.forID(r.getString("eventTimeZone"))), - tags = Option(r.getString("tags")).map(x => - x.split(",").toList).getOrElse(Nil), - prId = Option(r.getString("prId")), - creationTime = new DateTime(r.getTimestamp("creationTime").getTime, - DateTimeZone.forID(r.getString("creationTimeZone")))) - }).cache() - } - - def write(events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { - val sqlContext = new SQLContext(sc) - - import sqlContext.implicits._ - - val tableName = JDBCUtils.eventTableName(namespace, appId, channelId) - - val eventTableColumns = Seq[String]( - "id" - , "event" - , "entityType" - , "entityId" - , "targetEntityType" - , "targetEntityId" - , "properties" - , "eventTime" - , "eventTimeZone" - , "tags" - , "prId" - , "creationTime" - , "creationTimeZone") - - val eventDF = events.map(x => - Event(eventId = None, event = x.event, entityType = x.entityType, - entityId = x.entityId, targetEntityType = x.targetEntityType, - targetEntityId = x.targetEntityId, properties = x.properties, - eventTime = x.eventTime, tags = x.tags, prId= x.prId, - creationTime = x.eventTime) - ) - .map { event => - (event.eventId.getOrElse(JDBCUtils.generateId) - , event.event - , event.entityType - , event.entityId - , event.targetEntityType.orNull - , event.targetEntityId.orNull - , if (!event.properties.isEmpty) Serialization.write(event.properties.toJObject) else null - , new java.sql.Timestamp(event.eventTime.getMillis) - , event.eventTime.getZone.getID - , if (event.tags.nonEmpty) Some(event.tags.mkString(",")) else null - , event.prId - , new java.sql.Timestamp(event.creationTime.getMillis) - , event.creationTime.getZone.getID) - }.toDF(eventTableColumns:_*) - - // spark version 1.4.0 or higher - val prop = new java.util.Properties - prop.setProperty("user", config.properties("USERNAME")) - prop.setProperty("password", config.properties("PASSWORD")) - eventDF.write.mode(SaveMode.Append).jdbc(client, tableName, prop) - } - - def delete(eventIds: RDD[String], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { - - eventIds.foreachPartition{ iter => - - iter.foreach { eventId => - DB localTx { implicit session => - val tableName = JDBCUtils.eventTableName(namespace, appId, channelId) - val table = SQLSyntax.createUnsafely(tableName) - sql""" - delete from $table where id = $eventId - """.update().apply() - true - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala deleted file mode 100644 index 3eb55ba..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCUtils.scala +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.jdbc - -import scalikejdbc._ - -/** JDBC related utilities */ -object JDBCUtils { - /** Extract JDBC driver type from URL - * - * @param url JDBC URL - * @return The driver type, e.g. postgresql - */ - def driverType(url: String): String = { - val capture = """jdbc:([^:]+):""".r - capture findFirstIn url match { - case Some(capture(driverType)) => driverType - case None => "" - } - } - - /** Determines binary column type from JDBC URL - * - * @param url JDBC URL - * @return Binary column type as SQLSyntax, e.g. LONGBLOB - */ - def binaryColumnType(url: String): SQLSyntax = { - driverType(url) match { - case "postgresql" => sqls"bytea" - case "mysql" => sqls"longblob" - case _ => sqls"longblob" - } - } - - /** Determines UNIX timestamp conversion function from JDBC URL - * - * @param url JDBC URL - * @return Timestamp conversion function, e.g. TO_TIMESTAMP - */ - def timestampFunction(url: String): String = { - driverType(url) match { - case "postgresql" => "to_timestamp" - case "mysql" => "from_unixtime" - case _ => "from_unixtime" - } - } - - /** Converts Map of String to String to comma-separated list of key=value - * - * @param m Map of String to String - * @return Comma-separated list, e.g. FOO=BAR,X=Y,... - */ - def mapToString(m: Map[String, String]): String = { - m.map(t => s"${t._1}=${t._2}").mkString(",") - } - - /** Inverse of mapToString - * - * @param str Comma-separated list, e.g. FOO=BAR,X=Y,... - * @return Map of String to String, e.g. Map("FOO" -> "BAR", "X" -> "Y", ...) - */ - def stringToMap(str: String): Map[String, String] = { - str.split(",").map { x => - val y = x.split("=") - y(0) -> y(1) - }.toMap[String, String] - } - - /** Generate 32-character random ID using UUID with - stripped */ - def generateId: String = java.util.UUID.randomUUID().toString.replace("-", "") - - /** Prefix a table name - * - * @param prefix Table prefix - * @param table Table name - * @return Prefixed table name - */ - def prefixTableName(prefix: String, table: String): SQLSyntax = - sqls.createUnsafely(s"${prefix}_$table") - - /** Derive event table name - * - * @param namespace Namespace of event tables - * @param appId App ID - * @param channelId Optional channel ID - * @return Full event table name - */ - def eventTableName(namespace: String, appId: Int, channelId: Option[Int]): String = - s"${namespace}_${appId}${channelId.map("_" + _).getOrElse("")}" -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala deleted file mode 100644 index 661e05e..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/StorageClient.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.jdbc - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.BaseStorageClient -import org.apache.predictionio.data.storage.StorageClientConfig -import org.apache.predictionio.data.storage.StorageClientException -import scalikejdbc._ - -/** JDBC implementation of [[BaseStorageClient]] */ -class StorageClient(val config: StorageClientConfig) - extends BaseStorageClient with Logging { - override val prefix = "JDBC" - - if (!config.properties.contains("URL")) { - throw new StorageClientException("The URL variable is not set!", null) - } - if (!config.properties.contains("USERNAME")) { - throw new StorageClientException("The USERNAME variable is not set!", null) - } - if (!config.properties.contains("PASSWORD")) { - throw new StorageClientException("The PASSWORD variable is not set!", null) - } - - // set max size of connection pool - val maxSize: Int = config.properties.getOrElse("CONNECTIONS", "8").toInt - val settings = ConnectionPoolSettings(maxSize = maxSize) - - ConnectionPool.singleton( - config.properties("URL"), - config.properties("USERNAME"), - config.properties("PASSWORD"), - settings) - /** JDBC connection URL. Connections are managed by ScalikeJDBC. */ - val client = config.properties("URL") -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala deleted file mode 100644 index e552e54..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/jdbc/package.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage - -/** JDBC implementation of storage traits, supporting meta data, event data, and - * model data - * - * @group Implementation - */ -package object jdbc {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala deleted file mode 100644 index f528af9..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/localfs/LocalFSModels.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.localfs - -import java.io.File -import java.io.FileNotFoundException -import java.io.FileOutputStream - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.Model -import org.apache.predictionio.data.storage.Models -import org.apache.predictionio.data.storage.StorageClientConfig - -import scala.io.Source - -class LocalFSModels(f: File, config: StorageClientConfig, prefix: String) - extends Models with Logging { - - def insert(i: Model): Unit = { - try { - val fos = new FileOutputStream(new File(f, s"${prefix}${i.id}")) - fos.write(i.models) - fos.close - } catch { - case e: FileNotFoundException => error(e.getMessage) - } - } - - def get(id: String): Option[Model] = { - try { - Some(Model( - id = id, - models = Source.fromFile(new File(f, s"${prefix}${id}"))( - scala.io.Codec.ISO8859).map(_.toByte).toArray)) - } catch { - case e: Throwable => - error(e.getMessage) - None - } - } - - def delete(id: String): Unit = { - val m = new File(f, s"${prefix}${id}") - if (!m.delete) error(s"Unable to delete ${m.getCanonicalPath}!") - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/localfs/StorageClient.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/localfs/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/localfs/StorageClient.scala deleted file mode 100644 index b9ec957..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/localfs/StorageClient.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.localfs - -import java.io.File - -import grizzled.slf4j.Logging -import org.apache.predictionio.data.storage.BaseStorageClient -import org.apache.predictionio.data.storage.StorageClientConfig -import org.apache.predictionio.data.storage.StorageClientException - -class StorageClient(val config: StorageClientConfig) extends BaseStorageClient - with Logging { - override val prefix = "LocalFS" - val f = new File( - config.properties.getOrElse("PATH", config.properties("HOSTS"))) - if (f.exists) { - if (!f.isDirectory) throw new StorageClientException( - s"${f} already exists but it is not a directory!", - null) - if (!f.canWrite) throw new StorageClientException( - s"${f} already exists but it is not writable!", - null) - } else { - if (!f.mkdirs) throw new StorageClientException( - s"${f} does not exist and automatic creation failed!", - null) - } - val client = f -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d78b3cbe/data/src/main/scala/org/apache/predictionio/data/storage/localfs/package.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/localfs/package.scala b/data/src/main/scala/org/apache/predictionio/data/storage/localfs/package.scala deleted file mode 100644 index 554ab26..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/localfs/package.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage - -/** Local file system implementation of storage traits, supporting model data only - * - * @group Implementation - */ -package object localfs {}
