http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/PEventsSpec.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/io/prediction/data/storage/PEventsSpec.scala b/data/src/test/scala/io/prediction/data/storage/PEventsSpec.scala deleted file mode 100644 index 74614b2..0000000 --- a/data/src/test/scala/io/prediction/data/storage/PEventsSpec.scala +++ /dev/null @@ -1,210 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import org.specs2._ -import org.specs2.specification.Step - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -class PEventsSpec extends Specification with TestEvents { - - System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") - val sc = new SparkContext("local[4]", "PEventAggregatorSpec test") - - val appId = 1 - val channelId = 6 - val dbName = "test_pio_storage_events_" + hashCode - - def hbLocal = Storage.getDataObject[LEvents]( - StorageTestUtils.hbaseSourceName, - dbName - ) - - def hbPar = Storage.getDataObject[PEvents]( - StorageTestUtils.hbaseSourceName, - dbName - ) - - def jdbcLocal = Storage.getDataObject[LEvents]( - StorageTestUtils.jdbcSourceName, - dbName - ) - - def jdbcPar = Storage.getDataObject[PEvents]( - StorageTestUtils.jdbcSourceName, - dbName - ) - - def stopSpark = { - sc.stop() - } - - def is = s2""" - - PredictionIO Storage PEvents Specification - - PEvents can be implemented by: - - HBPEvents ${hbPEvents} - - JDBCPEvents ${jdbcPEvents} - - (stop Spark) ${Step(sc.stop())} - - """ - - def hbPEvents = sequential ^ s2""" - - HBPEvents should - - behave like any PEvents implementation ${events(hbLocal, hbPar)} - - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))} - - """ - - def jdbcPEvents = sequential ^ s2""" - - JDBCPEvents should - - behave like any PEvents implementation ${events(jdbcLocal, jdbcPar)} - - (table cleanup) ${Step(StorageTestUtils.dropJDBCTable(s"${dbName}_$appId"))} - - (table cleanup) ${Step(StorageTestUtils.dropJDBCTable(s"${dbName}_${appId}_$channelId"))} - - """ - - def events(localEventClient: LEvents, parEventClient: PEvents) = sequential ^ s2""" - - - (init test) ${initTest(localEventClient)} - - (insert test events) ${insertTestEvents(localEventClient)} - find in default ${find(parEventClient)} - find in channel ${findChannel(parEventClient)} - aggregate user properties in default ${aggregateUserProperties(parEventClient)} - aggregate user properties in channel ${aggregateUserPropertiesChannel(parEventClient)} - write to default ${write(parEventClient)} - write to channel ${writeChannel(parEventClient)} - - """ - - /* setup */ - - // events from TestEvents trait - val listOfEvents = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2, r1, r2) - val listOfEventsChannel = List(u3e1, u3e2, u3e3, r3, r4) - - def initTest(localEventClient: LEvents) = { - localEventClient.init(appId) - localEventClient.init(appId, Some(channelId)) - } - - def insertTestEvents(localEventClient: LEvents) = { - listOfEvents.map( localEventClient.insert(_, appId) ) - // insert to channel - listOfEventsChannel.map( localEventClient.insert(_, appId, Some(channelId)) ) - success - } - - /* following are tests */ - - def find(parEventClient: PEvents) = { - val resultRDD: RDD[Event] = parEventClient.find( - appId = appId - )(sc) - - val results = resultRDD.collect.toList - .map {_.copy(eventId = None)} // ignore eventId - - results must containTheSameElementsAs(listOfEvents) - } - - def findChannel(parEventClient: PEvents) = { - val resultRDD: RDD[Event] = parEventClient.find( - appId = appId, - channelId = Some(channelId) - )(sc) - - val results = resultRDD.collect.toList - .map {_.copy(eventId = None)} // ignore eventId - - results must containTheSameElementsAs(listOfEventsChannel) - } - - def aggregateUserProperties(parEventClient: PEvents) = { - val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties( - appId = appId, - entityType = "user" - )(sc) - val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap - - val expected = Map( - "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime), - "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime) - ) - - result must beEqualTo(expected) - } - - def aggregateUserPropertiesChannel(parEventClient: PEvents) = { - val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties( - appId = appId, - channelId = Some(channelId), - entityType = "user" - )(sc) - val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap - - val expected = Map( - "u3" -> PropertyMap(u3, u3BaseTime, u3LastTime) - ) - - result must beEqualTo(expected) - } - - def write(parEventClient: PEvents) = { - val written = List(r5, r6) - val writtenRDD = sc.parallelize(written) - parEventClient.write(writtenRDD, appId)(sc) - - // read back - val resultRDD = parEventClient.find( - appId = appId - )(sc) - - val results = resultRDD.collect.toList - .map { _.copy(eventId = None)} // ignore eventId - - val expected = listOfEvents ++ written - - results must containTheSameElementsAs(expected) - } - - def writeChannel(parEventClient: PEvents) = { - val written = List(r1, r5, r6) - val writtenRDD = sc.parallelize(written) - parEventClient.write(writtenRDD, appId, Some(channelId))(sc) - - // read back - val resultRDD = parEventClient.find( - appId = appId, - channelId = Some(channelId) - )(sc) - - val results = resultRDD.collect.toList - .map { _.copy(eventId = None)} // ignore eventId - - val expected = listOfEventsChannel ++ written - - results must containTheSameElementsAs(expected) - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/StorageTestUtils.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/io/prediction/data/storage/StorageTestUtils.scala b/data/src/test/scala/io/prediction/data/storage/StorageTestUtils.scala deleted file mode 100644 index 74615a1..0000000 --- a/data/src/test/scala/io/prediction/data/storage/StorageTestUtils.scala +++ /dev/null @@ -1,42 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import io.prediction.data.storage.hbase.HBLEvents -import scalikejdbc._ - -object StorageTestUtils { - val hbaseSourceName = "HBASE" - val jdbcSourceName = "PGSQL" - - def dropHBaseNamespace(namespace: String): Unit = { - val eventDb = Storage.getDataObject[LEvents](hbaseSourceName, namespace) - .asInstanceOf[HBLEvents] - val admin = eventDb.client.admin - val tableNames = admin.listTableNamesByNamespace(namespace) - tableNames.foreach { name => - admin.disableTable(name) - admin.deleteTable(name) - } - - //Only empty namespaces (no tables) can be removed. - admin.deleteNamespace(namespace) - } - - def dropJDBCTable(table: String): Unit = DB autoCommit { implicit s => - SQL(s"drop table $table").execute().apply() - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/TestEvents.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/io/prediction/data/storage/TestEvents.scala b/data/src/test/scala/io/prediction/data/storage/TestEvents.scala deleted file mode 100644 index 4fc2469..0000000 --- a/data/src/test/scala/io/prediction/data/storage/TestEvents.scala +++ /dev/null @@ -1,263 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.storage - -import org.joda.time.DateTime -import org.joda.time.DateTimeZone - -trait TestEvents { - - val u1BaseTime = new DateTime(654321) - val u2BaseTime = new DateTime(6543210) - val u3BaseTime = new DateTime(6543410) - - // u1 events - val u1e1 = Event( - event = "$set", - entityType = "user", - entityId = "u1", - properties = DataMap( - """{ - "a" : 1, - "b" : "value2", - "d" : [1, 2, 3], - }"""), - eventTime = u1BaseTime - ) - - val u1e2 = u1e1.copy( - event = "$set", - properties = DataMap("""{"a" : 2}"""), - eventTime = u1BaseTime.plusDays(1) - ) - - val u1e3 = u1e1.copy( - event = "$set", - properties = DataMap("""{"b" : "value4"}"""), - eventTime = u1BaseTime.plusDays(2) - ) - - val u1e4 = u1e1.copy( - event = "$unset", - properties = DataMap("""{"b" : null}"""), - eventTime = u1BaseTime.plusDays(3) - ) - - val u1e5 = u1e1.copy( - event = "$set", - properties = DataMap("""{"e" : "new"}"""), - eventTime = u1BaseTime.plusDays(4) - ) - - val u1LastTime = u1BaseTime.plusDays(4) - val u1 = """{"a": 2, "d": [1, 2, 3], "e": "new"}""" - - // delete event for u1 - val u1ed = u1e1.copy( - event = "$delete", - properties = DataMap(), - eventTime = u1BaseTime.plusDays(5) - ) - - // u2 events - val u2e1 = Event( - event = "$set", - entityType = "user", - entityId = "u2", - properties = DataMap( - """{ - "a" : 21, - "b" : "value12", - "d" : [7, 5, 6], - }"""), - eventTime = u2BaseTime - ) - - val u2e2 = u2e1.copy( - event = "$unset", - properties = DataMap("""{"a" : null}"""), - eventTime = u2BaseTime.plusDays(1) - ) - - val u2e3 = u2e1.copy( - event = "$set", - properties = DataMap("""{"b" : "value9", "g": "new11"}"""), - eventTime = u2BaseTime.plusDays(2) - ) - - val u2LastTime = u2BaseTime.plusDays(2) - val u2 = """{"b": "value9", "d": [7, 5, 6], "g": "new11"}""" - - // u3 events - val u3e1 = Event( - event = "$set", - entityType = "user", - entityId = "u3", - properties = DataMap( - """{ - "a" : 22, - "b" : "value13", - "d" : [5, 6, 1], - }"""), - eventTime = u3BaseTime - ) - - val u3e2 = u3e1.copy( - event = "$unset", - properties = DataMap("""{"a" : null}"""), - eventTime = u3BaseTime.plusDays(1) - ) - - val u3e3 = u3e1.copy( - event = "$set", - properties = DataMap("""{"b" : "value10", "f": "new12", "d" : [1, 3, 2]}"""), - eventTime = u3BaseTime.plusDays(2) - ) - - val u3LastTime = u3BaseTime.plusDays(2) - val u3 = """{"b": "value10", "d": [1, 3, 2], "f": "new12"}""" - - // some random events - val r1 = Event( - event = "my_event", - entityType = "my_entity_type", - entityId = "my_entity_id", - targetEntityType = Some("my_target_entity_type"), - targetEntityId = Some("my_target_entity_id"), - properties = DataMap( - """{ - "prop1" : 1, - "prop2" : "value2", - "prop3" : [1, 2, 3], - "prop4" : true, - "prop5" : ["a", "b", "c"], - "prop6" : 4.56 - }""" - ), - eventTime = DateTime.now, - prId = Some("my_prid") - ) - val r2 = Event( - event = "my_event2", - entityType = "my_entity_type2", - entityId = "my_entity_id2" - ) - val r3 = Event( - event = "my_event3", - entityType = "my_entity_type", - entityId = "my_entity_id", - targetEntityType = Some("my_target_entity_type"), - targetEntityId = Some("my_target_entity_id"), - properties = DataMap( - """{ - "propA" : 1.2345, - "propB" : "valueB", - }""" - ), - prId = Some("my_prid") - ) - val r4 = Event( - event = "my_event4", - entityType = "my_entity_type4", - entityId = "my_entity_id4", - targetEntityType = Some("my_target_entity_type4"), - targetEntityId = Some("my_target_entity_id4"), - properties = DataMap( - """{ - "prop1" : 1, - "prop2" : "value2", - "prop3" : [1, 2, 3], - "prop4" : true, - "prop5" : ["a", "b", "c"], - "prop6" : 4.56 - }"""), - eventTime = DateTime.now - ) - val r5 = Event( - event = "my_event5", - entityType = "my_entity_type5", - entityId = "my_entity_id5", - targetEntityType = Some("my_target_entity_type5"), - targetEntityId = Some("my_target_entity_id5"), - properties = DataMap( - """{ - "prop1" : 1, - "prop2" : "value2", - "prop3" : [1, 2, 3], - "prop4" : true, - "prop5" : ["a", "b", "c"], - "prop6" : 4.56 - }""" - ), - eventTime = DateTime.now - ) - val r6 = Event( - event = "my_event6", - entityType = "my_entity_type6", - entityId = "my_entity_id6", - targetEntityType = Some("my_target_entity_type6"), - targetEntityId = Some("my_target_entity_id6"), - properties = DataMap( - """{ - "prop1" : 6, - "prop2" : "value2", - "prop3" : [6, 7, 8], - "prop4" : true, - "prop5" : ["a", "b", "c"], - "prop6" : 4.56 - }""" - ), - eventTime = DateTime.now - ) - - // timezone - val tz1 = Event( - event = "my_event", - entityType = "my_entity_type", - entityId = "my_entity_id0", - targetEntityType = Some("my_target_entity_type"), - targetEntityId = Some("my_target_entity_id"), - properties = DataMap( - """{ - "prop1" : 1, - "prop2" : "value2", - "prop3" : [1, 2, 3], - "prop4" : true, - "prop5" : ["a", "b", "c"], - "prop6" : 4.56 - }""" - ), - eventTime = new DateTime(12345678, DateTimeZone.forID("-08:00")), - prId = Some("my_prid") - ) - - val tz2 = Event( - event = "my_event", - entityType = "my_entity_type", - entityId = "my_entity_id1", - eventTime = new DateTime(12345678, DateTimeZone.forID("+02:00")), - prId = Some("my_prid") - ) - - val tz3 = Event( - event = "my_event", - entityType = "my_entity_type", - entityId = "my_entity_id2", - eventTime = new DateTime(12345678, DateTimeZone.forID("+08:00")), - prId = Some("my_prid") - ) - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/webhooks/ConnectorTestUtil.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/io/prediction/data/webhooks/ConnectorTestUtil.scala b/data/src/test/scala/io/prediction/data/webhooks/ConnectorTestUtil.scala deleted file mode 100644 index 4009e0f..0000000 --- a/data/src/test/scala/io/prediction/data/webhooks/ConnectorTestUtil.scala +++ /dev/null @@ -1,47 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.webhooks - -import org.specs2.execute.Result -import org.specs2.mutable._ - -import org.json4s.JObject -import org.json4s.DefaultFormats -import org.json4s.native.JsonMethods.parse -import org.json4s.native.Serialization.write - -/** TestUtil for JsonConnector */ -trait ConnectorTestUtil extends Specification { - - implicit val formats = DefaultFormats - - def check(connector: JsonConnector, original: String, event: String): Result = { - val originalJson = parse(original).asInstanceOf[JObject] - val eventJson = parse(event).asInstanceOf[JObject] - // write and parse back to discard any JNothing field - val result = parse(write(connector.toEventJson(originalJson))).asInstanceOf[JObject] - result.obj must containTheSameElementsAs(eventJson.obj) - } - - def check(connector: FormConnector, original: Map[String, String], event: String) = { - - val eventJson = parse(event).asInstanceOf[JObject] - // write and parse back to discard any JNothing field - val result = parse(write(connector.toEventJson(original))).asInstanceOf[JObject] - - result.obj must containTheSameElementsAs(eventJson.obj) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnectorSpec.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnectorSpec.scala b/data/src/test/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnectorSpec.scala deleted file mode 100644 index 7f6ad8f..0000000 --- a/data/src/test/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnectorSpec.scala +++ /dev/null @@ -1,164 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.webhooks.exampleform - -import io.prediction.data.webhooks.ConnectorTestUtil - -import org.specs2.mutable._ - -/** Test the ExampleFormConnector */ -class ExampleFormConnectorSpec extends Specification with ConnectorTestUtil { - - "ExampleFormConnector" should { - - "convert userAction to Event JSON" in { - // webhooks input - val userAction = Map( - "type" -> "userAction", - "userId" -> "as34smg4", - "event" -> "do_something", - "context[ip]" -> "24.5.68.47", // optional - "context[prop1]" -> "2.345", // optional - "context[prop2]" -> "value1", // optional - "anotherProperty1" -> "100", - "anotherProperty2"-> "optional1", // optional - "timestamp" -> "2015-01-02T00:30:12.984Z" - ) - - // expected converted Event JSON - val expected = """ - { - "event": "do_something", - "entityType": "user", - "entityId": "as34smg4", - "properties": { - "context": { - "ip": "24.5.68.47", - "prop1": 2.345 - "prop2": "value1" - }, - "anotherProperty1": 100, - "anotherProperty2": "optional1" - } - "eventTime": "2015-01-02T00:30:12.984Z" - } - """ - - check(ExampleFormConnector, userAction, expected) - } - - "convert userAction without optional fields to Event JSON" in { - // webhooks input - val userAction = Map( - "type" -> "userAction", - "userId" -> "as34smg4", - "event" -> "do_something", - "anotherProperty1" -> "100", - "timestamp" -> "2015-01-02T00:30:12.984Z" - ) - - // expected converted Event JSON - val expected = """ - { - "event": "do_something", - "entityType": "user", - "entityId": "as34smg4", - "properties": { - "anotherProperty1": 100, - } - "eventTime": "2015-01-02T00:30:12.984Z" - } - """ - - check(ExampleFormConnector, userAction, expected) - } - - "convert userActionItem to Event JSON" in { - // webhooks input - val userActionItem = Map( - "type" -> "userActionItem", - "userId" -> "as34smg4", - "event" -> "do_something_on", - "itemId" -> "kfjd312bc", - "context[ip]" -> "1.23.4.56", - "context[prop1]" -> "2.345", - "context[prop2]" -> "value1", - "anotherPropertyA" -> "4.567", // optional - "anotherPropertyB" -> "false", // optional - "timestamp" -> "2015-01-15T04:20:23.567Z" - ) - - // expected converted Event JSON - val expected = """ - { - "event": "do_something_on", - "entityType": "user", - "entityId": "as34smg4", - "targetEntityType": "item", - "targetEntityId": "kfjd312bc" - "properties": { - "context": { - "ip": "1.23.4.56", - "prop1": 2.345 - "prop2": "value1" - }, - "anotherPropertyA": 4.567 - "anotherPropertyB": false - } - "eventTime": "2015-01-15T04:20:23.567Z" - } - """ - - check(ExampleFormConnector, userActionItem, expected) - } - - "convert userActionItem without optional fields to Event JSON" in { - // webhooks input - val userActionItem = Map( - "type" -> "userActionItem", - "userId" -> "as34smg4", - "event" -> "do_something_on", - "itemId" -> "kfjd312bc", - "context[ip]" -> "1.23.4.56", - "context[prop1]" -> "2.345", - "context[prop2]" -> "value1", - "timestamp" -> "2015-01-15T04:20:23.567Z" - ) - - // expected converted Event JSON - val expected = """ - { - "event": "do_something_on", - "entityType": "user", - "entityId": "as34smg4", - "targetEntityType": "item", - "targetEntityId": "kfjd312bc" - "properties": { - "context": { - "ip": "1.23.4.56", - "prop1": 2.345 - "prop2": "value1" - } - } - "eventTime": "2015-01-15T04:20:23.567Z" - } - """ - - check(ExampleFormConnector, userActionItem, expected) - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnectorSpec.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnectorSpec.scala b/data/src/test/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnectorSpec.scala deleted file mode 100644 index bdf1cc4..0000000 --- a/data/src/test/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnectorSpec.scala +++ /dev/null @@ -1,179 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.webhooks.examplejson - -import io.prediction.data.webhooks.ConnectorTestUtil - -import org.specs2.mutable._ - -/** Test the ExampleJsonConnector */ -class ExampleJsonConnectorSpec extends Specification with ConnectorTestUtil { - - "ExampleJsonConnector" should { - - "convert userAction to Event JSON" in { - // webhooks input - val userAction = """ - { - "type": "userAction" - "userId": "as34smg4", - "event": "do_something", - "context": { - "ip": "24.5.68.47", - "prop1": 2.345 - "prop2": "value1" - }, - "anotherProperty1": 100, - "anotherProperty2": "optional1", - "timestamp": "2015-01-02T00:30:12.984Z" - } - """ - - // expected converted Event JSON - val expected = """ - { - "event": "do_something", - "entityType": "user", - "entityId": "as34smg4", - "properties": { - "context": { - "ip": "24.5.68.47", - "prop1": 2.345 - "prop2": "value1" - }, - "anotherProperty1": 100, - "anotherProperty2": "optional1" - } - "eventTime": "2015-01-02T00:30:12.984Z" - } - """ - - check(ExampleJsonConnector, userAction, expected) - } - - "convert userAction without optional field to Event JSON" in { - // webhooks input - val userAction = """ - { - "type": "userAction" - "userId": "as34smg4", - "event": "do_something", - "anotherProperty1": 100, - "timestamp": "2015-01-02T00:30:12.984Z" - } - """ - - // expected converted Event JSON - val expected = """ - { - "event": "do_something", - "entityType": "user", - "entityId": "as34smg4", - "properties": { - "anotherProperty1": 100, - } - "eventTime": "2015-01-02T00:30:12.984Z" - } - """ - - check(ExampleJsonConnector, userAction, expected) - } - - "convert userActionItem to Event JSON" in { - // webhooks input - val userActionItem = """ - { - "type": "userActionItem" - "userId": "as34smg4", - "event": "do_something_on", - "itemId": "kfjd312bc", - "context": { - "ip": "1.23.4.56", - "prop1": 2.345 - "prop2": "value1" - }, - "anotherPropertyA": 4.567 - "anotherPropertyB": false - "timestamp": "2015-01-15T04:20:23.567Z" - } - """ - - // expected converted Event JSON - val expected = """ - { - "event": "do_something_on", - "entityType": "user", - "entityId": "as34smg4", - "targetEntityType": "item", - "targetEntityId": "kfjd312bc" - "properties": { - "context": { - "ip": "1.23.4.56", - "prop1": 2.345 - "prop2": "value1" - }, - "anotherPropertyA": 4.567 - "anotherPropertyB": false - } - "eventTime": "2015-01-15T04:20:23.567Z" - } - """ - - check(ExampleJsonConnector, userActionItem, expected) - } - - "convert userActionItem without optional fields to Event JSON" in { - // webhooks input - val userActionItem = """ - { - "type": "userActionItem" - "userId": "as34smg4", - "event": "do_something_on", - "itemId": "kfjd312bc", - "context": { - "ip": "1.23.4.56", - "prop1": 2.345 - "prop2": "value1" - } - "timestamp": "2015-01-15T04:20:23.567Z" - } - """ - - // expected converted Event JSON - val expected = """ - { - "event": "do_something_on", - "entityType": "user", - "entityId": "as34smg4", - "targetEntityType": "item", - "targetEntityId": "kfjd312bc" - "properties": { - "context": { - "ip": "1.23.4.56", - "prop1": 2.345 - "prop2": "value1" - } - } - "eventTime": "2015-01-15T04:20:23.567Z" - } - """ - - check(ExampleJsonConnector, userActionItem, expected) - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnectorSpec.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnectorSpec.scala b/data/src/test/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnectorSpec.scala deleted file mode 100644 index 56484c2..0000000 --- a/data/src/test/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnectorSpec.scala +++ /dev/null @@ -1,254 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.webhooks.mailchimp - -import io.prediction.data.webhooks.ConnectorTestUtil - -import org.specs2.mutable._ - -class MailChimpConnectorSpec extends Specification with ConnectorTestUtil { - - // TOOD: test other events - // TODO: test different optional fields - - "MailChimpConnector" should { - - "convert subscribe to event JSON" in { - - val subscribe = Map( - "type" -> "subscribe", - "fired_at" -> "2009-03-26 21:35:57", - "data[id]" -> "8a25ff1d98", - "data[list_id]" -> "a6b5da1054", - "data[email]" -> "[email protected]", - "data[email_type]" -> "html", - "data[merges][EMAIL]" -> "[email protected]", - "data[merges][FNAME]" -> "MailChimp", - "data[merges][LNAME]" -> "API", - "data[merges][INTERESTS]" -> "Group1,Group2", //optional - "data[ip_opt]" -> "10.20.10.30", - "data[ip_signup]" -> "10.20.10.30" - ) - - val expected = """ - { - "event" : "subscribe", - "entityType" : "user", - "entityId" : "8a25ff1d98", - "targetEntityType" : "list", - "targetEntityId" : "a6b5da1054", - "properties" : { - "email" : "[email protected]", - "email_type" : "html", - "merges" : { - "EMAIL" : "[email protected]", - "FNAME" : "MailChimp", - "LNAME" : "API" - "INTERESTS" : "Group1,Group2" - }, - "ip_opt" : "10.20.10.30", - "ip_signup" : "10.20.10.30" - }, - "eventTime" : "2009-03-26T21:35:57.000Z" - } - """ - - check(MailChimpConnector, subscribe, expected) - } - - //check unsubscribe to event Json - "convert unsubscribe to event JSON" in { - - val unsubscribe = Map( - "type" -> "unsubscribe", - "fired_at" -> "2009-03-26 21:40:57", - "data[action]" -> "unsub", - "data[reason]" -> "manual", - "data[id]" -> "8a25ff1d98", - "data[list_id]" -> "a6b5da1054", - "data[email]" -> "[email protected]", - "data[email_type]" -> "html", - "data[merges][EMAIL]" -> "[email protected]", - "data[merges][FNAME]" -> "MailChimp", - "data[merges][LNAME]" -> "API", - "data[merges][INTERESTS]" -> "Group1,Group2", //optional - "data[ip_opt]" -> "10.20.10.30", - "data[campaign_id]" -> "cb398d21d2" - ) - - val expected = """ - { - "event" : "unsubscribe", - "entityType" : "user", - "entityId" : "8a25ff1d98", - "targetEntityType" : "list", - "targetEntityId" : "a6b5da1054", - "properties" : { - "action" : "unsub", - "reason" : "manual", - "email" : "[email protected]", - "email_type" : "html", - "merges" : { - "EMAIL" : "[email protected]", - "FNAME" : "MailChimp", - "LNAME" : "API" - "INTERESTS" : "Group1,Group2" - }, - "ip_opt" : "10.20.10.30", - "campaign_id" : "cb398d21d2" - }, - "eventTime" : "2009-03-26T21:40:57.000Z" - } - """ - - check(MailChimpConnector, unsubscribe, expected) - } - - //check profile update to event Json - "convert profile update to event JSON" in { - - val profileUpdate = Map( - "type" -> "profile", - "fired_at" -> "2009-03-26 21:31:21", - "data[id]" -> "8a25ff1d98", - "data[list_id]" -> "a6b5da1054", - "data[email]" -> "[email protected]", - "data[email_type]" -> "html", - "data[merges][EMAIL]" -> "[email protected]", - "data[merges][FNAME]" -> "MailChimp", - "data[merges][LNAME]" -> "API", - "data[merges][INTERESTS]" -> "Group1,Group2", //optional - "data[ip_opt]" -> "10.20.10.30" - ) - - val expected = """ - { - "event" : "profile", - "entityType" : "user", - "entityId" : "8a25ff1d98", - "targetEntityType" : "list", - "targetEntityId" : "a6b5da1054", - "properties" : { - "email" : "[email protected]", - "email_type" : "html", - "merges" : { - "EMAIL" : "[email protected]", - "FNAME" : "MailChimp", - "LNAME" : "API" - "INTERESTS" : "Group1,Group2" - }, - "ip_opt" : "10.20.10.30" - }, - "eventTime" : "2009-03-26T21:31:21.000Z" - } - """ - - check(MailChimpConnector, profileUpdate, expected) - } - - //check email update to event Json - "convert email update to event JSON" in { - - val emailUpdate = Map( - "type" -> "upemail", - "fired_at" -> "2009-03-26 22:15:09", - "data[list_id]" -> "a6b5da1054", - "data[new_id]" -> "51da8c3259", - "data[new_email]" -> "[email protected]", - "data[old_email]" -> "[email protected]" - ) - - val expected = """ - { - "event" : "upemail", - "entityType" : "user", - "entityId" : "51da8c3259", - "targetEntityType" : "list", - "targetEntityId" : "a6b5da1054", - "properties" : { - "new_email" : "[email protected]", - "old_email" : "[email protected]" - }, - "eventTime" : "2009-03-26T22:15:09.000Z" - } - """ - - check(MailChimpConnector, emailUpdate, expected) - } - - //check cleaned email to event Json - "convert cleaned email to event JSON" in { - - val cleanedEmail = Map( - "type" -> "cleaned", - "fired_at" -> "2009-03-26 22:01:00", - "data[list_id]" -> "a6b5da1054", - "data[campaign_id]" -> "4fjk2ma9xd", - "data[reason]" -> "hard", - "data[email]" -> "[email protected]" - ) - - val expected = """ - { - "event" : "cleaned", - "entityType" : "list", - "entityId" : "a6b5da1054", - "properties" : { - "campaignId" : "4fjk2ma9xd", - "reason" : "hard", - "email" : "[email protected]" - }, - "eventTime" : "2009-03-26T22:01:00.000Z" - } - """ - - check(MailChimpConnector, cleanedEmail, expected) - } - - //check campaign sending status to event Json - "convert campaign sending status to event JSON" in { - - val campaign = Map( - "type" -> "campaign", - "fired_at" -> "2009-03-26 22:15:09", - "data[id]" -> "5aa2102003", - "data[subject]" -> "Test Campaign Subject", - "data[status]" -> "sent", - "data[reason]" -> "", - "data[list_id]" -> "a6b5da1054" - ) - - val expected = """ - { - "event" : "campaign", - "entityType" : "campaign", - "entityId" : "5aa2102003", - "targetEntityType" : "list", - "targetEntityId" : "a6b5da1054", - "properties" : { - "subject" : "Test Campaign Subject", - "status" : "sent", - "reason" : "" - }, - "eventTime" : "2009-03-26T22:15:09.000Z" - } - """ - - check(MailChimpConnector, campaign, expected) - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnectorSpec.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnectorSpec.scala b/data/src/test/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnectorSpec.scala deleted file mode 100644 index d7587cd..0000000 --- a/data/src/test/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnectorSpec.scala +++ /dev/null @@ -1,335 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.data.webhooks.segmentio - -import io.prediction.data.webhooks.ConnectorTestUtil - -import org.specs2.mutable._ - -class SegmentIOConnectorSpec extends Specification with ConnectorTestUtil { - - // TODO: test different optional fields - - val commonFields = - s""" - | "anonymous_id": "id", - | "sent_at": "sendAt", - | "version": "2", - """.stripMargin - - "SegmentIOConnector" should { - - "convert group with context to event JSON" in { - val context = - """ - | "context": { - | "app": { - | "name": "InitechGlobal", - | "version": "545", - | "build": "3.0.1.545" - | }, - | "campaign": { - | "name": "TPS Innovation Newsletter", - | "source": "Newsletter", - | "medium": "email", - | "term": "tps reports", - | "content": "image link" - | }, - | "device": { - | "id": "B5372DB0-C21E-11E4-8DFC-AA07A5B093DB", - | "advertising_id": "7A3CBEA0-BDF5-11E4-8DFC-AA07A5B093DB", - | "ad_tracking_enabled": true, - | "manufacturer": "Apple", - | "model": "iPhone7,2", - | "name": "maguro", - | "type": "ios", - | "token": "ff15bc0c20c4aa6cd50854ff165fd265c838e5405bfeb9571066395b8c9da449" - | }, - | "ip": "8.8.8.8", - | "library": { - | "name": "analytics-ios", - | "version": "1.8.0" - | }, - | "network": { - | "bluetooth": false, - | "carrier": "T-Mobile NL", - | "cellular": true, - | "wifi": false - | }, - | "location": { - | "city": "San Francisco", - | "country": "United States", - | "latitude": 40.2964197, - | "longitude": -76.9411617, - | "speed": 0 - | }, - | "os": { - | "name": "iPhone OS", - | "version": "8.1.3" - | }, - | "referrer": { - | "id": "ABCD582CDEFFFF01919", - | "type": "dataxu" - | }, - | "screen": { - | "width": 320, - | "height": 568, - | "density": 2 - | }, - | "timezone": "Europe/Amsterdam", - | "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5)" - | } - """.stripMargin - - val group = - s""" - |{ $commonFields - | "type": "group", - | "group_id": "groupId", - | "user_id": "userIdValue", - | "timestamp" : "2012-12-02T00:30:08.276Z", - | "traits": { - | "name": "groupName", - | "employees": 329, - | }, - | $context - |} - """.stripMargin - - val expected = - s""" - |{ - | "event": "group", - | "entityType": "user", - | "entityId": "userIdValue", - | "properties": { - | $context, - | "group_id": "groupId", - | "traits": { - | "name": "groupName", - | "employees": 329 - | }, - | }, - | "eventTime" : "2012-12-02T00:30:08.276Z" - |} - """.stripMargin - - check(SegmentIOConnector, group, expected) - } - - "convert group to event JSON" in { - val group = - s""" - |{ $commonFields - | "type": "group", - | "group_id": "groupId", - | "user_id": "userIdValue", - | "timestamp" : "2012-12-02T00:30:08.276Z", - | "traits": { - | "name": "groupName", - | "employees": 329, - | } - |} - """.stripMargin - - val expected = - """ - |{ - | "event": "group", - | "entityType": "user", - | "entityId": "userIdValue", - | "properties": { - | "group_id": "groupId", - | "traits": { - | "name": "groupName", - | "employees": 329 - | } - | }, - | "eventTime" : "2012-12-02T00:30:08.276Z" - |} - """.stripMargin - - check(SegmentIOConnector, group, expected) - } - - "convert screen to event JSON" in { - val screen = - s""" - |{ $commonFields - | "type": "screen", - | "name": "screenName", - | "user_id": "userIdValue", - | "timestamp" : "2012-12-02T00:30:08.276Z", - | "properties": { - | "variation": "screenVariation" - | } - |} - """.stripMargin - - val expected = - """ - |{ - | "event": "screen", - | "entityType": "user", - | "entityId": "userIdValue", - | "properties": { - | "properties": { - | "variation": "screenVariation" - | }, - | "name": "screenName" - | }, - | "eventTime" : "2012-12-02T00:30:08.276Z" - |} - """.stripMargin - - check(SegmentIOConnector, screen, expected) - } - - "convert page to event JSON" in { - val page = - s""" - |{ $commonFields - | "type": "page", - | "name": "pageName", - | "user_id": "userIdValue", - | "timestamp" : "2012-12-02T00:30:08.276Z", - | "properties": { - | "title": "pageTitle", - | "url": "pageUrl" - | } - |} - """.stripMargin - - val expected = - """ - |{ - | "event": "page", - | "entityType": "user", - | "entityId": "userIdValue", - | "properties": { - | "properties": { - | "title": "pageTitle", - | "url": "pageUrl" - | }, - | "name": "pageName" - | }, - | "eventTime" : "2012-12-02T00:30:08.276Z" - |} - """.stripMargin - - check(SegmentIOConnector, page, expected) - } - - "convert alias to event JSON" in { - val alias = - s""" - |{ $commonFields - | "type": "alias", - | "previous_id": "previousIdValue", - | "user_id": "userIdValue", - | "timestamp" : "2012-12-02T00:30:08.276Z" - |} - """.stripMargin - - val expected = - """ - |{ - | "event": "alias", - | "entityType": "user", - | "entityId": "userIdValue", - | "properties": { - | "previous_id" : "previousIdValue" - | }, - | "eventTime" : "2012-12-02T00:30:08.276Z" - |} - """.stripMargin - - check(SegmentIOConnector, alias, expected) - } - - "convert track to event JSON" in { - val track = - s""" - |{ $commonFields - | "user_id": "some_user_id", - | "type": "track", - | "event": "Registered", - | "timestamp" : "2012-12-02T00:30:08.276Z", - | "properties": { - | "plan": "Pro Annual", - | "accountType" : "Facebook" - | } - |} - """.stripMargin - - val expected = - """ - |{ - | "event": "track", - | "entityType": "user", - | "entityId": "some_user_id", - | "properties": { - | "event": "Registered", - | "properties": { - | "plan": "Pro Annual", - | "accountType": "Facebook" - | } - | }, - | "eventTime" : "2012-12-02T00:30:08.276Z" - |} - """.stripMargin - - check(SegmentIOConnector, track, expected) - } - - "convert identify to event JSON" in { - val identify = s""" - { $commonFields - "type" : "identify", - "user_id" : "019mr8mf4r", - "traits" : { - "email" : "[email protected]", - "name" : "Achilles", - "subscription_plan" : "Premium", - "friendCount" : 29 - }, - "timestamp" : "2012-12-02T00:30:08.276Z" - } - """ - - val expected = """ - { - "event" : "identify", - "entityType": "user", - "entityId" : "019mr8mf4r", - "properties" : { - "traits" : { - "email" : "[email protected]", - "name" : "Achilles", - "subscription_plan" : "Premium", - "friendCount" : 29 - } - }, - "eventTime" : "2012-12-02T00:30:08.276Z" - } - """ - - check(SegmentIOConnector, identify, expected) - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala b/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala new file mode 100644 index 0000000..62fd89c --- /dev/null +++ b/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala @@ -0,0 +1,68 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.api + +import org.apache.predictionio.data.storage.Storage + +import akka.testkit.TestProbe +import akka.actor.ActorSystem +import akka.actor.Props + +import spray.http.HttpEntity +import spray.http.HttpResponse +import spray.http.ContentTypes +import spray.httpx.RequestBuilding.Get + +import org.specs2.mutable.Specification + +class EventServiceSpec extends Specification { + + val system = ActorSystem("EventServiceSpecSystem") + + val eventClient = Storage.getLEvents() + val accessKeysClient = Storage.getMetaDataAccessKeys() + val channelsClient = Storage.getMetaDataChannels() + + val eventServiceActor = system.actorOf( + Props( + new EventServiceActor( + eventClient, + accessKeysClient, + channelsClient, + EventServerConfig() + ) + ) + ) + + "GET / request" should { + "properly produce OK HttpResponses" in { + val probe = TestProbe()(system) + probe.send(eventServiceActor, Get("/")) + probe.expectMsg( + HttpResponse( + 200, + HttpEntity( + contentType = ContentTypes.`application/json`, + string = """{"status":"alive"}""" + ) + ) + ) + success + } + } + + step(system.shutdown()) +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala b/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala new file mode 100644 index 0000000..bae0f0b --- /dev/null +++ b/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala @@ -0,0 +1,175 @@ +package io.prediction.data.api + +import akka.actor.{ActorSystem, Props} +import akka.testkit.TestProbe +import io.prediction.data.storage._ +import org.joda.time.DateTime +import org.specs2.mutable.Specification +import spray.http.HttpHeaders.RawHeader +import spray.http.{ContentTypes, HttpEntity, HttpResponse} +import spray.httpx.RequestBuilding._ +import sun.misc.BASE64Encoder + +import scala.concurrent.{Future, ExecutionContext} + +class SegmentIOAuthSpec extends Specification { + + val system = ActorSystem("EventServiceSpecSystem") + sequential + isolated + val eventClient = new LEvents { + override def init(appId: Int, channelId: Option[Int]): Boolean = true + + override def futureInsert(event: Event, appId: Int, channelId: Option[Int]) + (implicit ec: ExecutionContext): Future[String] = + Future successful "event_id" + + override def futureFind( + appId: Int, channelId: Option[Int], startTime: Option[DateTime], + untilTime: Option[DateTime], entityType: Option[String], + entityId: Option[String], eventNames: Option[Seq[String]], + targetEntityType: Option[Option[String]], + targetEntityId: Option[Option[String]], limit: Option[Int], + reversed: Option[Boolean]) + (implicit ec: ExecutionContext): Future[Iterator[Event]] = + Future successful List.empty[Event].iterator + + override def futureGet(eventId: String, appId: Int, channelId: Option[Int]) + (implicit ec: ExecutionContext): Future[Option[Event]] = + Future successful None + + override def remove(appId: Int, channelId: Option[Int]): Boolean = true + + override def futureDelete(eventId: String, appId: Int, channelId: Option[Int]) + (implicit ec: ExecutionContext): Future[Boolean] = + Future successful true + + override def close(): Unit = {} + } + val appId = 0 + val accessKeysClient = new AccessKeys { + override def insert(k: AccessKey): Option[String] = null + override def getByAppid(appid: Int): Seq[AccessKey] = null + override def update(k: AccessKey): Unit = {} + override def delete(k: String): Unit = {} + override def getAll(): Seq[AccessKey] = null + + override def get(k: String): Option[AccessKey] = + k match { + case "abc" â Some(AccessKey(k, appId, Seq.empty)) + case _ â None + } + } + + val channelsClient = Storage.getMetaDataChannels() + val eventServiceActor = system.actorOf( + Props( + new EventServiceActor( + eventClient, + accessKeysClient, + channelsClient, + EventServerConfig() + ) + ) + ) + + val base64Encoder = new BASE64Encoder + + "Event Service" should { + + "reject with CredentialsRejected with invalid credentials" in { + val accessKey = "abc123:" + val probe = TestProbe()(system) + probe.send( + eventServiceActor, + Post("/webhooks/segmentio.json") + .withHeaders( + List( + RawHeader("Authorization", s"Basic $accessKey") + ) + ) + ) + probe.expectMsg( + HttpResponse( + 401, + HttpEntity( + contentType = ContentTypes.`application/json`, + string = """{"message":"Invalid accessKey."}""" + ) + ) + ) + success + } + + "reject with CredentialsMissed without credentials" in { + val probe = TestProbe()(system) + probe.send( + eventServiceActor, + Post("/webhooks/segmentio.json") + ) + probe.expectMsg( + HttpResponse( + 401, + HttpEntity( + contentType = ContentTypes.`application/json`, + string = """{"message":"Missing accessKey."}""" + ) + ) + ) + success + } + + "process SegmentIO identity request properly" in { + val jsonReq = + """ + |{ + | "anonymous_id": "507f191e810c19729de860ea", + | "channel": "browser", + | "context": { + | "ip": "8.8.8.8", + | "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5)" + | }, + | "message_id": "022bb90c-bbac-11e4-8dfc-aa07a5b093db", + | "timestamp": "2015-02-23T22:28:55.387Z", + | "sent_at": "2015-02-23T22:28:55.111Z", + | "traits": { + | "name": "Peter Gibbons", + | "email": "[email protected]", + | "plan": "premium", + | "logins": 5 + | }, + | "type": "identify", + | "user_id": "97980cfea0067", + | "version": "2" + |} + """.stripMargin + + val accessKey = "abc:" + val accessKeyEncoded = base64Encoder.encodeBuffer(accessKey.getBytes) + val probe = TestProbe()(system) + probe.send( + eventServiceActor, + Post( + "/webhooks/segmentio.json", + HttpEntity(ContentTypes.`application/json`, jsonReq.getBytes) + ).withHeaders( + List( + RawHeader("Authorization", s"Basic $accessKeyEncoded") + ) + ) + ) + probe.expectMsg( + HttpResponse( + 201, + HttpEntity( + contentType = ContentTypes.`application/json`, + string = """{"eventId":"event_id"}""" + ) + ) + ) + success + } + } + + step(system.shutdown()) +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/storage/BiMapSpec.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/BiMapSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/BiMapSpec.scala new file mode 100644 index 0000000..c98c882 --- /dev/null +++ b/data/src/test/scala/org/apache/predictionio/data/storage/BiMapSpec.scala @@ -0,0 +1,196 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import org.specs2.mutable._ + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.SparkConf +import org.apache.spark.rdd.RDD + +class BiMapSpec extends Specification { + + System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") + val sc = new SparkContext("local[4]", "BiMapSpec test") + + "BiMap created with map" should { + + val keys = Seq(1, 4, 6) + val orgValues = Seq(2, 5, 7) + val org = keys.zip(orgValues).toMap + val bi = BiMap(org) + + "return correct values for each key of original map" in { + val biValues = keys.map(k => bi(k)) + + biValues must beEqualTo(orgValues) + } + + "get return Option[V]" in { + val checkKeys = keys ++ Seq(12345) + val biValues = checkKeys.map(k => bi.get(k)) + val expected = orgValues.map(Some(_)) ++ Seq(None) + + biValues must beEqualTo(expected) + } + + "getOrElse return value for each key of original map" in { + val biValues = keys.map(k => bi.getOrElse(k, -1)) + + biValues must beEqualTo(orgValues) + } + + "getOrElse return default values for invalid key" in { + val keys = Seq(999, -1, -2) + val defaults = Seq(1234, 5678, 987) + val biValues = keys.zip(defaults).map{ case (k,d) => bi.getOrElse(k, d) } + + biValues must beEqualTo(defaults) + } + + "contains() returns true/false correctly" in { + val checkKeys = keys ++ Seq(12345) + val biValues = checkKeys.map(k => bi.contains(k)) + val expected = orgValues.map(_ => true) ++ Seq(false) + + biValues must beEqualTo(expected) + } + + "same size as original map" in { + (bi.size) must beEqualTo(org.size) + } + + "take(2) returns BiMap of size 2" in { + bi.take(2).size must beEqualTo(2) + } + + "toMap contain same element as original map" in { + (bi.toMap) must beEqualTo(org) + } + + "toSeq contain same element as original map" in { + (bi.toSeq) must containTheSameElementsAs(org.toSeq) + } + + "inverse and return correct keys for each values of original map" in { + val biKeys = orgValues.map(v => bi.inverse(v)) + biKeys must beEqualTo(keys) + } + + "inverse with same size" in { + bi.inverse.size must beEqualTo(org.size) + } + + "inverse's inverse reference back to the same original object" in { + // NOTE: reference equality + bi.inverse.inverse == bi + } + } + + "BiMap created with duplicated values in map" should { + val dup = Map(1 -> 2, 4 -> 7, 6 -> 7) + "return IllegalArgumentException" in { + BiMap(dup) must throwA[IllegalArgumentException] + } + } + + "BiMap.stringLong and stringInt" should { + + "create BiMap from set of string" in { + val keys = Set("a", "b", "foo", "bar") + val values: Seq[Long] = Seq(0, 1, 2, 3) + + val bi = BiMap.stringLong(keys) + val biValues = keys.map(k => bi(k)) + + val biInt = BiMap.stringInt(keys) + val valuesInt: Seq[Int] = values.map(_.toInt) + val biIntValues = keys.map(k => biInt(k)) + + biValues must containTheSameElementsAs(values) and + (biIntValues must containTheSameElementsAs(valuesInt)) + } + + "create BiMap from Array of unique string" in { + val keys = Array("a", "b", "foo", "bar") + val values: Seq[Long] = Seq(0, 1, 2, 3) + + val bi = BiMap.stringLong(keys) + val biValues = keys.toSeq.map(k => bi(k)) + + val biInt = BiMap.stringInt(keys) + val valuesInt: Seq[Int] = values.map(_.toInt) + val biIntValues = keys.toSeq.map(k => biInt(k)) + + biValues must containTheSameElementsAs(values) and + (biIntValues must containTheSameElementsAs(valuesInt)) + } + + "not guarantee sequential index for Array with duplicated string" in { + val keys = Array("a", "b", "foo", "bar", "a", "b", "x") + val dupValues: Seq[Long] = Seq(0, 1, 2, 3, 4, 5, 6) + val values = keys.zip(dupValues).toMap.values.toSeq + + val bi = BiMap.stringLong(keys) + val biValues = keys.toSet[String].map(k => bi(k)) + + val biInt = BiMap.stringInt(keys) + val valuesInt: Seq[Int] = values.map(_.toInt) + val biIntValues = keys.toSet[String].map(k => biInt(k)) + + biValues must containTheSameElementsAs(values) and + (biIntValues must containTheSameElementsAs(valuesInt)) + } + + "create BiMap from RDD[String]" in { + + val keys = Seq("a", "b", "foo", "bar") + val values: Seq[Long] = Seq(0, 1, 2, 3) + val rdd = sc.parallelize(keys) + + val bi = BiMap.stringLong(rdd) + val biValues = keys.map(k => bi(k)) + + val biInt = BiMap.stringInt(rdd) + val valuesInt: Seq[Int] = values.map(_.toInt) + val biIntValues = keys.map(k => biInt(k)) + + biValues must containTheSameElementsAs(values) and + (biIntValues must containTheSameElementsAs(valuesInt)) + } + + "create BiMap from RDD[String] with duplicated string" in { + + val keys = Seq("a", "b", "foo", "bar", "a", "b", "x") + val values: Seq[Long] = Seq(0, 1, 2, 3, 4) + val rdd = sc.parallelize(keys) + + val bi = BiMap.stringLong(rdd) + val biValues = keys.distinct.map(k => bi(k)) + + val biInt = BiMap.stringInt(rdd) + val valuesInt: Seq[Int] = values.map(_.toInt) + val biIntValues = keys.distinct.map(k => biInt(k)) + + biValues must containTheSameElementsAs(values) and + (biIntValues must containTheSameElementsAs(valuesInt)) + } + } + + step(sc.stop()) +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/storage/DataMapSpec.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/DataMapSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/DataMapSpec.scala new file mode 100644 index 0000000..46ae8dd --- /dev/null +++ b/data/src/test/scala/org/apache/predictionio/data/storage/DataMapSpec.scala @@ -0,0 +1,243 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import org.specs2.mutable._ + +class DataMapSpec extends Specification { + + "DataMap" should { + + val properties = DataMap(""" + { + "prop1" : 1, + "prop2" : "value2", + "prop3" : [1, 2, 3], + "prop4" : true, + "prop5" : ["a", "b", "c", "c"], + "prop6" : 4.56 + } + """) + + "get Int data" in { + properties.get[Int]("prop1") must beEqualTo(1) + properties.getOpt[Int]("prop1") must beEqualTo(Some(1)) + } + + "get String data" in { + properties.get[String]("prop2") must beEqualTo("value2") + properties.getOpt[String]("prop2") must beEqualTo(Some("value2")) + } + + "get List of Int data" in { + properties.get[List[Int]]("prop3") must beEqualTo(List(1,2,3)) + properties.getOpt[List[Int]]("prop3") must beEqualTo(Some(List(1,2,3))) + } + + "get Boolean data" in { + properties.get[Boolean]("prop4") must beEqualTo(true) + properties.getOpt[Boolean]("prop4") must beEqualTo(Some(true)) + } + + "get List of String data" in { + properties.get[List[String]]("prop5") must beEqualTo(List("a", "b", "c", "c")) + properties.getOpt[List[String]]("prop5") must beEqualTo(Some(List("a", "b", "c", "c"))) + } + + "get Set of String data" in { + properties.get[Set[String]]("prop5") must beEqualTo(Set("a", "b", "c")) + properties.getOpt[Set[String]]("prop5") must beEqualTo(Some(Set("a", "b", "c"))) + } + + "get Double data" in { + properties.get[Double]("prop6") must beEqualTo(4.56) + properties.getOpt[Double]("prop6") must beEqualTo(Some(4.56)) + } + + "get empty optional Int data" in { + properties.getOpt[Int]("prop9999") must beEqualTo(None) + } + + } + + "DataMap with multi-level data" should { + val properties = DataMap(""" + { + "context": { + "ip": "1.23.4.56", + "prop1": 2.345 + "prop2": "value1", + "prop4": [1, 2, 3] + }, + "anotherPropertyA": 4.567, + "anotherPropertyB": false + } + """) + + "get case class data" in { + val expected = DataMapSpec.Context( + ip = "1.23.4.56", + prop1 = Some(2.345), + prop2 = Some("value1"), + prop3 = None, + prop4 = List(1,2,3) + ) + + properties.get[DataMapSpec.Context]("context") must beEqualTo(expected) + } + + "get empty optional case class data" in { + properties.getOpt[DataMapSpec.Context]("context999") must beEqualTo(None) + } + + "get double data" in { + properties.get[Double]("anotherPropertyA") must beEqualTo(4.567) + } + + "get boolean data" in { + properties.get[Boolean]("anotherPropertyB") must beEqualTo(false) + } + } + + "DataMap extract" should { + + "extract to case class object" in { + val properties = DataMap(""" + { + "prop1" : 1, + "prop2" : "value2", + "prop3" : [1, 2, 3], + "prop4" : true, + "prop5" : ["a", "b", "c", "c"], + "prop6" : 4.56 + } + """) + + val result = properties.extract[DataMapSpec.BasicProperty] + val expected = DataMapSpec.BasicProperty( + prop1 = 1, + prop2 = "value2", + prop3 = List(1,2,3), + prop4 = true, + prop5 = List("a", "b", "c", "c"), + prop6 = 4.56 + ) + + result must beEqualTo(expected) + } + + "extract with optional fields" in { + val propertiesEmpty = DataMap("""{}""") + val propertiesSome = DataMap(""" + { + "prop1" : 1, + "prop5" : ["a", "b", "c", "c"], + "prop6" : 4.56 + } + """) + + val resultEmpty = propertiesEmpty.extract[DataMapSpec.OptionProperty] + val expectedEmpty = DataMapSpec.OptionProperty( + prop1 = None, + prop2 = None, + prop3 = None, + prop4 = None, + prop5 = None, + prop6 = None + ) + + val resultSome = propertiesSome.extract[DataMapSpec.OptionProperty] + val expectedSome = DataMapSpec.OptionProperty( + prop1 = Some(1), + prop2 = None, + prop3 = None, + prop4 = None, + prop5 = Some(List("a", "b", "c", "c")), + prop6 = Some(4.56) + ) + + resultEmpty must beEqualTo(expectedEmpty) + resultSome must beEqualTo(expectedSome) + } + + "extract to multi-level object" in { + val properties = DataMap(""" + { + "context": { + "ip": "1.23.4.56", + "prop1": 2.345 + "prop2": "value1", + "prop4": [1, 2, 3] + }, + "anotherPropertyA": 4.567, + "anotherPropertyB": false + } + """) + + val result = properties.extract[DataMapSpec.MultiLevelProperty] + val expected = DataMapSpec.MultiLevelProperty( + context = DataMapSpec.Context( + ip = "1.23.4.56", + prop1 = Some(2.345), + prop2 = Some("value1"), + prop3 = None, + prop4 = List(1,2,3) + ), + anotherPropertyA = 4.567, + anotherPropertyB = false + ) + + result must beEqualTo(expected) + } + + } +} + +object DataMapSpec { + + // define this case class inside object to avoid case class name conflict with other tests + case class Context( + ip: String, + prop1: Option[Double], + prop2: Option[String], + prop3: Option[Int], + prop4: List[Int] + ) + + case class BasicProperty( + prop1: Int, + prop2: String, + prop3: List[Int], + prop4: Boolean, + prop5: List[String], + prop6: Double + ) + + case class OptionProperty( + prop1: Option[Int], + prop2: Option[String], + prop3: Option[List[Int]], + prop4: Option[Boolean], + prop5: Option[List[String]], + prop6: Option[Double] + ) + + case class MultiLevelProperty( + context: Context, + anotherPropertyA: Double, + anotherPropertyB: Boolean + ) +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/storage/LEventAggregatorSpec.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/LEventAggregatorSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/LEventAggregatorSpec.scala new file mode 100644 index 0000000..8c02186 --- /dev/null +++ b/data/src/test/scala/org/apache/predictionio/data/storage/LEventAggregatorSpec.scala @@ -0,0 +1,103 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import org.specs2.mutable._ + +import org.json4s.JObject +import org.json4s.native.JsonMethods.parse + +import org.joda.time.DateTime + +class LEventAggregatorSpec extends Specification with TestEvents { + + "LEventAggregator.aggregateProperties()" should { + + "aggregate two entities' properties as DataMap correctly" in { + val events = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2) + val result: Map[String, DataMap] = + LEventAggregator.aggregateProperties(events.toIterator) + + val expected = Map( + "u1" -> DataMap(u1), + "u2" -> DataMap(u2) + ) + + result must beEqualTo(expected) + } + + "aggregate two entities' properties as PropertyMap correctly" in { + val events = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2) + val result: Map[String, PropertyMap] = + LEventAggregator.aggregateProperties(events.toIterator) + + val expected = Map( + "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime), + "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime) + ) + + result must beEqualTo(expected) + } + + + "aggregate deleted entity correctly" in { + val events = Vector(u1e5, u2e2, u1e3, u1ed, u1e1, u2e3, u2e1, u1e4, u1e2) + + val result = LEventAggregator.aggregateProperties(events.toIterator) + val expected = Map( + "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime) + ) + + result must beEqualTo(expected) + } + } + + + "LEventAggregator.aggregatePropertiesSingle()" should { + + "aggregate single entity properties as DataMap correctly" in { + val events = Vector(u1e5, u1e3, u1e1, u1e4, u1e2) + val eventsIt = events.toIterator + + val result: Option[DataMap] = LEventAggregator + .aggregatePropertiesSingle(eventsIt) + val expected = DataMap(u1) + + result must beEqualTo(Some(expected)) + } + + "aggregate single entity properties as PropertyMap correctly" in { + val events = Vector(u1e5, u1e3, u1e1, u1e4, u1e2) + val eventsIt = events.toIterator + + val result: Option[PropertyMap] = LEventAggregator + .aggregatePropertiesSingle(eventsIt) + val expected = PropertyMap(u1, u1BaseTime, u1LastTime) + + result must beEqualTo(Some(expected)) + } + + "aggregate deleted entity correctly" in { + // put the delete event in the middle + val events = Vector(u1e4, u1e2, u1ed, u1e3, u1e1, u1e5) + val eventsIt = events.toIterator + + val result = LEventAggregator.aggregatePropertiesSingle(eventsIt) + + result must beEqualTo(None) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala new file mode 100644 index 0000000..0639613 --- /dev/null +++ b/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala @@ -0,0 +1,245 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import org.specs2._ +import org.specs2.specification.Step + +class LEventsSpec extends Specification with TestEvents { + def is = s2""" + + PredictionIO Storage LEvents Specification + + Events can be implemented by: + - HBLEvents ${hbEvents} + - JDBCLEvents ${jdbcLEvents} + + """ + + def hbEvents = sequential ^ s2""" + + HBLEvents should + - behave like any LEvents implementation ${events(hbDO)} + - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))} + + """ + + def jdbcLEvents = sequential ^ s2""" + + JDBCLEvents should + - behave like any LEvents implementation ${events(jdbcDO)} + + """ + + val appId = 1 + + def events(eventClient: LEvents) = sequential ^ s2""" + + init default ${initDefault(eventClient)} + insert 3 test events and get back by event ID ${insertAndGetEvents(eventClient)} + insert 3 test events with timezone and get back by event ID ${insertAndGetTimezone(eventClient)} + insert and delete by ID ${insertAndDelete(eventClient)} + insert test user events ${insertTestUserEvents(eventClient)} + find user events ${findUserEvents(eventClient)} + aggregate user properties ${aggregateUserProperties(eventClient)} + aggregate one user properties ${aggregateOneUserProperties(eventClient)} + aggregate non-existent user properties ${aggregateNonExistentUserProperties(eventClient)} + init channel ${initChannel(eventClient)} + insert 2 events to channel ${insertChannel(eventClient)} + insert 1 event to channel and delete by ID ${insertAndDeleteChannel(eventClient)} + find events from channel ${findChannel(eventClient)} + remove default ${removeDefault(eventClient)} + remove channel ${removeChannel(eventClient)} + + """ + + val dbName = "test_pio_storage_events_" + hashCode + def hbDO = Storage.getDataObject[LEvents]( + StorageTestUtils.hbaseSourceName, + dbName + ) + + def jdbcDO = Storage.getDataObject[LEvents](StorageTestUtils.jdbcSourceName, dbName) + + def initDefault(eventClient: LEvents) = { + eventClient.init(appId) + } + + def insertAndGetEvents(eventClient: LEvents) = { + + // events from TestEvents trait + val listOfEvents = List(r1,r2,r3) + + val insertResp = listOfEvents.map { eventClient.insert(_, appId) } + + val insertedEventId: List[String] = insertResp + + val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId) + .map { case (e, id) => Some(e.copy(eventId = Some(id))) } + + val getResp = insertedEventId.map { id => eventClient.get(id, appId) } + + val getEvents = getResp + + insertedEvent must containTheSameElementsAs(getEvents) + } + + def insertAndGetTimezone(eventClient: LEvents) = { + val listOfEvents = List(tz1, tz2, tz3) + + val insertResp = listOfEvents.map { eventClient.insert(_, appId) } + + val insertedEventId: List[String] = insertResp + + val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId) + .map { case (e, id) => Some(e.copy(eventId = Some(id))) } + + val getResp = insertedEventId.map { id => eventClient.get(id, appId) } + + val getEvents = getResp + + insertedEvent must containTheSameElementsAs(getEvents) + } + + def insertAndDelete(eventClient: LEvents) = { + val eventId = eventClient.insert(r2, appId) + + val resultBefore = eventClient.get(eventId, appId) + + val expectedBefore = r2.copy(eventId = Some(eventId)) + + val deleteStatus = eventClient.delete(eventId, appId) + + val resultAfter = eventClient.get(eventId, appId) + + (resultBefore must beEqualTo(Some(expectedBefore))) and + (deleteStatus must beEqualTo(true)) and + (resultAfter must beEqualTo(None)) + } + + def insertTestUserEvents(eventClient: LEvents) = { + // events from TestEvents trait + val listOfEvents = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2) + + listOfEvents.map{ eventClient.insert(_, appId) } + + success + } + + def findUserEvents(eventClient: LEvents) = { + + val results: List[Event] = eventClient.find( + appId = appId, + entityType = Some("user")) + .toList + .map(e => e.copy(eventId = None)) // ignore eventID + + // same events in insertTestUserEvents + val expected = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2) + + results must containTheSameElementsAs(expected) + } + + def aggregateUserProperties(eventClient: LEvents) = { + + val result: Map[String, PropertyMap] = eventClient.aggregateProperties( + appId = appId, + entityType = "user") + + val expected = Map( + "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime), + "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime) + ) + + result must beEqualTo(expected) + } + + def aggregateOneUserProperties(eventClient: LEvents) = { + val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity( + appId = appId, + entityType = "user", + entityId = "u1") + + val expected = Some(PropertyMap(u1, u1BaseTime, u1LastTime)) + + result must beEqualTo(expected) + } + + def aggregateNonExistentUserProperties(eventClient: LEvents) = { + val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity( + appId = appId, + entityType = "user", + entityId = "u999999") + + result must beEqualTo(None) + } + + val channelId = 12 + + def initChannel(eventClient: LEvents) = { + eventClient.init(appId, Some(channelId)) + } + + def insertChannel(eventClient: LEvents) = { + + // events from TestEvents trait + val listOfEvents = List(r4,r5) + + listOfEvents.map( eventClient.insert(_, appId, Some(channelId)) ) + + success + } + + def insertAndDeleteChannel(eventClient: LEvents) = { + + val eventId = eventClient.insert(r2, appId, Some(channelId)) + + val resultBefore = eventClient.get(eventId, appId, Some(channelId)) + + val expectedBefore = r2.copy(eventId = Some(eventId)) + + val deleteStatus = eventClient.delete(eventId, appId, Some(channelId)) + + val resultAfter = eventClient.get(eventId, appId, Some(channelId)) + + (resultBefore must beEqualTo(Some(expectedBefore))) and + (deleteStatus must beEqualTo(true)) and + (resultAfter must beEqualTo(None)) + } + + def findChannel(eventClient: LEvents) = { + + val results: List[Event] = eventClient.find( + appId = appId, + channelId = Some(channelId) + ) + .toList + .map(e => e.copy(eventId = None)) // ignore eventId + + // same events in insertChannel + val expected = List(r4, r5) + + results must containTheSameElementsAs(expected) + } + + def removeDefault(eventClient: LEvents) = { + eventClient.remove(appId) + } + + def removeChannel(eventClient: LEvents) = { + eventClient.remove(appId, Some(channelId)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/storage/PEventAggregatorSpec.scala ---------------------------------------------------------------------- diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/PEventAggregatorSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/PEventAggregatorSpec.scala new file mode 100644 index 0000000..21790ad --- /dev/null +++ b/data/src/test/scala/org/apache/predictionio/data/storage/PEventAggregatorSpec.scala @@ -0,0 +1,72 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage + +import org.specs2.mutable._ + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + + +class PEventAggregatorSpec extends Specification with TestEvents { + + System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") + val sc = new SparkContext("local[4]", "PEventAggregatorSpec test") + + "PEventAggregator" should { + + "aggregate two entities' properties as DataMap/PropertyMap correctly" in { + val events = sc.parallelize(Seq( + u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)) + + val users = PEventAggregator.aggregateProperties(events) + + val userMap = users.collectAsMap.toMap + val expectedDM = Map( + "u1" -> DataMap(u1), + "u2" -> DataMap(u2) + ) + + val expectedPM = Map( + "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime), + "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime) + ) + + userMap must beEqualTo(expectedDM) + userMap must beEqualTo(expectedPM) + } + + "aggregate deleted entity correctly" in { + // put the delete event in middle + val events = sc.parallelize(Seq( + u1e5, u2e2, u1e3, u1ed, u1e1, u2e3, u2e1, u1e4, u1e2)) + + val users = PEventAggregator.aggregateProperties(events) + + val userMap = users.collectAsMap.toMap + val expectedPM = Map( + "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime) + ) + + userMap must beEqualTo(expectedPM) + } + + } + + step(sc.stop()) +}
