http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/03e60a67/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/BasicIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/BasicIntegrationTest.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/BasicIntegrationTest.scala new file mode 100644 index 0000000..cb1b329 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/BasicIntegrationTest.scala @@ -0,0 +1,598 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ittest.io.pivotal.geode.spark.connector + +import java.util.Properties +import org.apache.geode.cache.query.QueryService +import org.apache.geode.cache.query.internal.StructImpl +import io.pivotal.geode.spark.connector._ +import org.apache.geode.cache.Region +import io.pivotal.geode.spark.connector.internal.{RegionMetadata, DefaultGeodeConnectionManager} +import io.pivotal.geode.spark.connector.internal.oql.{RDDConverter, QueryRDD} +import ittest.io.pivotal.geode.spark.connector.testkit.GeodeCluster +import ittest.io.pivotal.geode.spark.connector.testkit.IOUtils +import org.apache.spark.streaming.{Seconds, StreamingContext, TestInputDStream} +import org.apache.spark.{SparkContext, SparkConf} +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} +import scala.collection.JavaConversions +import scala.reflect.ClassTag + +case class Number(str: String, len: Int) + +class BasicIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GeodeCluster { + + var sc: SparkContext = null + + override def beforeAll() { + // start geode cluster, and spark context + val settings = new Properties() + settings.setProperty("cache-xml-file", "src/it/resources/test-regions.xml") + settings.setProperty("num-of-servers", "2") + val locatorPort = GeodeCluster.start(settings) + + // start spark context in local mode + IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO", + "log4j.logger.io.pivotal.geode.spark.connector" -> "DEBUG") + val conf = new SparkConf() + .setAppName("BasicIntegrationTest") + .setMaster("local[2]") + .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + .set(GeodeLocatorPropKey, s"localhost[$locatorPort]") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.kryo.registrator", "io.pivotal.geode.spark.connector.GeodeKryoRegistrator") + + sc = new SparkContext(conf) + } + + override def afterAll() { + // stop connection, spark context, and geode cluster + DefaultGeodeConnectionManager.closeConnection(GeodeConnectionConf(sc.getConf)) + sc.stop() + GeodeCluster.stop() + } + + //Convert Map[Object, Object] to java.util.Properties + private def map2Props(map: Map[Object, Object]): java.util.Properties = + (new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); props} + + // =========================================================== + // DefaultGeodeConnection functional tests + // =========================================================== + + test("DefaultGeodeConnection.validateRegion()") { + val conn = GeodeConnectionConf(sc.getConf).getConnection + + // normal exist-region + var regionPath: String = "str_str_region" + conn.validateRegion[String, String](regionPath) + + // non-exist region + regionPath = "non_exist_region" + try { + conn.validateRegion[String, String](regionPath) + fail("validateRegion failed to catch non-exist region error") + } catch { + case e: RuntimeException => + if (! e.getMessage.contains(s"The region named $regionPath was not found")) + fail("validateRegion gives wrong exception on non-exist region", e) + case e: Throwable => + fail("validateRegion gives wrong exception on non-exist region", e) + } + + // Note: currently, can't catch type mismatch error + conn.validateRegion[String, Integer]("str_str_region") + } + + test("DefaultGeodeConnection.getRegionMetadata()") { + val conn = GeodeConnectionConf(sc.getConf).getConnection + + // exist region + validateRegionMetadata(conn, "obj_obj_region", true, 113, null, null, false) + validateRegionMetadata(conn, "str_int_region", true, 113, "java.lang.String", "java.lang.Integer", false) + validateRegionMetadata(conn, "str_str_rep_region", false, 0, "java.lang.String", "java.lang.String", true) + + // non-exist region + assert(! conn.getRegionMetadata("no_exist_region").isDefined) + } + + def validateRegionMetadata( + conn: GeodeConnection, regionPath: String, partitioned: Boolean, buckets: Int, + keyType: String, valueType: String, emptyMap: Boolean): Unit = { + + val mdOption = conn.getRegionMetadata(regionPath) + val md = mdOption.get + + assert(md.getRegionPath == s"/$regionPath") + assert(md.isPartitioned == partitioned) + assert(md.getKeyTypeName == keyType) + assert(md.getValueTypeName == valueType) + assert(md.getTotalBuckets == buckets) + if (emptyMap) assert(md.getServerBucketMap == null) + else assert(md.getServerBucketMap != null) + } + + test("DefaultGeodeConnection.getRegionProxy()") { + val conn = GeodeConnectionConf(sc.getConf).getConnection + + val region1 = conn.getRegionProxy[String, String]("str_str_region") + region1.put("1", "One") + assert(region1.get("1") == "One") + region1.remove("1") + assert(region1.get("1") == null) + + // getRegionProxy doesn't fail when region doesn't exist + val region2 = conn.getRegionProxy[String, String]("non_exist_region") + try { + region2.put("1", "One") + fail("getRegionProxy failed to catch non-exist region error") + } catch { + case e: Exception => + if (e.getCause == null || ! e.getCause.getMessage.contains(s"Region named /non_exist_region was not found")) { + e.printStackTrace() + fail("validateRegion gives wrong exception on non-exist region", e) + } + } + } + + // Note: DefaultGeodeConnecton.getQuery() and getRegionData() are covered by + // RetrieveRegionIntegrationTest.scala and following OQL tests. + + // =========================================================== + // OQL functional tests + // =========================================================== + + private def initRegion(regionName: String): Unit = { + + //Populate some data in the region + val conn = GeodeConnectionConf(sc.getConf).getConnection + val rgn: Region[Object, Object] = conn.getRegionProxy(regionName) + rgn.removeAll(rgn.keySetOnServer()) + + //This will call the implicit conversion map2Properties in connector package object, since it is Map[String, String] + var position1 = new Position(Map("secId" -> "SUN", "qty" -> "34000", "mktValue" -> "24.42")) + var position2 = new Position(Map("secId" -> "IBM", "qty" -> "8765", "mktValue" -> "34.29")) + val portfolio1 = new Portfolio(map2Props(Map("id" ->"1", "type" -> "type1", "status" -> "active", + "position1" -> position1, "position2" -> position2))) + rgn.put("1", portfolio1) + + position1 = new Position(Map("secId" -> "YHOO", "qty" -> "9834", "mktValue" -> "12.925")) + position2 = new Position(Map("secId" -> "GOOG", "qty" -> "12176", "mktValue" -> "21.972")) + val portfolio2 = new Portfolio(map2Props(Map("id" -> "2", "type" -> "type2", "status" -> "inactive", + "position1" -> position1, "position2" -> position2))) + rgn.put("2", portfolio2) + + position1 = new Position(Map("secId" -> "MSFT", "qty" -> "98327", "mktValue" -> "23.32")) + position2 = new Position(Map("secId" -> "AOL", "qty" -> "978", "mktValue" -> "40.373")) + val portfolio3 = new Portfolio(map2Props(Map("id" -> "3", "type" -> "type3", "status" -> "active", + "position1" -> position1, "position2" -> position2))) + rgn.put("3", portfolio3) + + position1 = new Position(Map("secId" -> "APPL", "qty" -> "67", "mktValue" -> "67.356572")) + position2 = new Position(Map("secId" -> "ORCL", "qty" -> "376", "mktValue" -> "101.34")) + val portfolio4 = new Portfolio(map2Props(Map("id" -> "4", "type" -> "type1", "status" -> "inactive", + "position1" -> position1, "position2" -> position2))) + rgn.put("4", portfolio4) + + position1 = new Position(Map("secId" -> "SAP", "qty" -> "90", "mktValue" -> "67.356572")) + position2 = new Position(Map("secId" -> "DELL", "qty" -> "376", "mktValue" -> "101.34")) + val portfolio5 = new Portfolio(map2Props(Map("id" -> "5", "type" -> "type2", "status" -> "active", + "position1" -> position1, "position2" -> position2))) + rgn.put("5", portfolio5) + + position1 = new Position(Map("secId" -> "RHAT", "qty" -> "90", "mktValue" -> "67.356572")) + position2 = new Position(Map("secId" -> "NOVL", "qty" -> "376", "mktValue" -> "101.34")) + val portfolio6 = new Portfolio(map2Props(Map("id" -> "6", "type" -> "type3", "status" -> "inactive", + "position1" -> position1, "position2" -> position2))) + rgn.put("6", portfolio6) + + position1 = new Position(Map("secId" -> "MSFT", "qty" -> "98327", "mktValue" -> "23.32")) + position2 = new Position(Map("secId" -> "AOL", "qty" -> "978", "mktValue" -> "40.373")) + val portfolio7 = new Portfolio(map2Props(Map("id" -> "7", "type" -> "type4", "status" -> "active", + "position1" -> position1, "position2" -> position2))) + //Not using null, due to intermittent query failure on column containing null, likely a Spark SQL bug + //portfolio7.setType(null) + rgn.put("7", portfolio7) + } + + private def getQueryRDD[T: ClassTag]( + query: String, connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf)): QueryRDD[T] = + new QueryRDD[T](sc, query, connConf) + + test("Run Geode OQL query and convert the returned QueryRDD to DataFrame: Partitioned Region") { + simpleQuery("obj_obj_region") + } + + test("Run Geode OQL query and convert the returned QueryRDD to DataFrame: Replicated Region") { + simpleQuery("obj_obj_rep_region") + } + + private def simpleQuery(regionName: String) { + //Populate some data in the region + val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf) + val conn = connConf.getConnection + val rgn: Region[String, String] = conn.getRegionProxy(regionName) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> "one", "2" -> "two", "3" -> "three"))) + + //Create QueryRDD using OQL + val OQLResult: QueryRDD[String] = getQueryRDD[String](s"select * from /$regionName") + + //verify the QueryRDD + val oqlRS: Array[String] = OQLResult.collect() + oqlRS should have length 3 + oqlRS should contain theSameElementsAs List("one", "two", "three") + + //Convert QueryRDD to DataFrame + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + // this is used to implicitly convert an RDD to a DataFrame. + import sqlContext.implicits._ + val dataFrame = OQLResult.map(x => Number(x, x.length)).toDF() + //Register dataFrame as a table of two columns of type String and Int respectively + dataFrame.registerTempTable("numberTable") + + //Issue SQL query against the table + val SQLResult = sqlContext.sql("SELECT * FROM numberTable") + //Verify the SQL query result, r(0) mean column 0 + val sqlRS: Array[Any] = SQLResult.map(r => r(0)).collect() + sqlRS should have length 3 + sqlRS should contain theSameElementsAs List("one", "two", "three") + + //Convert QueryRDD to DataFrame using RDDConverter + val dataFrame2 = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext) + //Register dataFrame2 as a table of two columns of type String and Int respectively + dataFrame2.registerTempTable("numberTable2") + + //Issue SQL query against the table + val SQLResult2 = sqlContext.sql("SELECT * FROM numberTable2") + //Verify the SQL query result, r(0) mean column 0 + val sqlRS2: Array[Any] = SQLResult2.map(r => r(0)).collect() + sqlRS2 should have length 3 + sqlRS2 should contain theSameElementsAs List("one", "two", "three") + + //Remove the region entries, because other tests might use the same region as well + List("1", "2", "3").foreach(rgn.remove) + } + + test("Run Geode OQL query and directly return DataFrame: Partitioned Region") { + simpleQueryDataFrame("obj_obj_region") + } + + test("Run Geode OQL query and directly return DataFrame: Replicated Region") { + simpleQueryDataFrame("obj_obj_rep_region") + } + + private def simpleQueryDataFrame(regionName: String) { + //Populate some data in the region + val conn = GeodeConnectionConf(sc.getConf).getConnection + val rgn: Region[String, String] = conn.getRegionProxy(regionName) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> "one", "2" -> "two", "3" -> "three"))) + + //Create DataFrame using Geode OQL + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + val dataFrame = sqlContext.geodeOQL(s"select * from /$regionName") + dataFrame.registerTempTable("numberTable") + + //Issue SQL query against the table + val SQLResult = sqlContext.sql("SELECT * FROM numberTable") + //Verify the SQL query result, r(0) mean column 0 + val sqlRS: Array[Any] = SQLResult.map(r => r(0)).collect() + sqlRS should have length 3 + sqlRS should contain theSameElementsAs List("one", "two", "three") + + //Remove the region entries, because other tests might use the same region as well + List("1", "2", "3").foreach(rgn.remove) + } + + test("Geode OQL query with UDT: Partitioned Region") { + queryUDT("obj_obj_region") + } + + test("Geode OQL query with UDT: Replicated Region") { + queryUDT("obj_obj_rep_region") + } + + private def queryUDT(regionName: String) { + + //Populate some data in the region + val conn = GeodeConnectionConf(sc.getConf).getConnection + val rgn: Region[Object, Object] = conn.getRegionProxy(regionName) + rgn.removeAll(rgn.keySetOnServer()) + val e1: Employee = new Employee("hello", 123) + val e2: Employee = new Employee("world", 456) + rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> e1, "2" -> e2))) + + //Create QueryRDD using OQL + val OQLResult: QueryRDD[Object] = getQueryRDD(s"select name, age from /$regionName") + + //verify the QueryRDD + val oqlRS: Array[Object] = OQLResult.collect() + oqlRS should have length 2 + oqlRS.map(e => e.asInstanceOf[StructImpl].getFieldValues.apply(1)) should contain theSameElementsAs List(123, 456) + + //Convert QueryRDD to DataFrame + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + + //Convert QueryRDD to DataFrame using RDDConverter + val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext) + dataFrame.registerTempTable("employee") + val SQLResult = sqlContext.sql("SELECT * FROM employee") + + //Verify the SQL query result + val sqlRS = SQLResult.map(r => r(0)).collect() + sqlRS should have length 2 + sqlRS should contain theSameElementsAs List("hello", "world") + + List("1", "2").foreach(rgn.remove) + } + + test("Geode OQL query with UDT and directly return DataFrame: Partitioned Region") { + queryUDTDataFrame("obj_obj_region") + } + + test("Geode OQL query with UDT and directly return DataFrame: Replicated Region") { + queryUDTDataFrame("obj_obj_rep_region") + } + + private def queryUDTDataFrame(regionName: String) { + //Populate some data in the region + val conn = GeodeConnectionConf(sc.getConf).getConnection + val rgn: Region[Object, Object] = conn.getRegionProxy(regionName) + rgn.removeAll(rgn.keySetOnServer()) + val e1: Employee = new Employee("hello", 123) + val e2: Employee = new Employee("world", 456) + rgn.putAll(JavaConversions.mapAsJavaMap(Map("1" -> e1, "2" -> e2))) + + //Create DataFrame using Geode OQL + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + val dataFrame = sqlContext.geodeOQL(s"select name, age from /$regionName") + + dataFrame.registerTempTable("employee") + val SQLResult = sqlContext.sql("SELECT * FROM employee") + + //Verify the SQL query result + val sqlRS = SQLResult.map(r => r(0)).collect() + sqlRS should have length 2 + sqlRS should contain theSameElementsAs List("hello", "world") + + List("1", "2").foreach(rgn.remove) + } + + test("Geode OQL query with more complex UDT: Partitioned Region") { + complexUDT("obj_obj_region") + } + + test("Geode OQL query with more complex UDT: Replicated Region") { + complexUDT("obj_obj_rep_region") + } + + private def complexUDT(regionName: String) { + + initRegion(regionName) + + //Create QueryRDD using OQL + val OQLResult: QueryRDD[Object] = getQueryRDD(s"SELECT DISTINCT * FROM /$regionName WHERE status = 'active'") + + //verify the QueryRDD + val oqlRS: Array[Int] = OQLResult.collect().map(r => r.asInstanceOf[Portfolio].getId) + oqlRS should contain theSameElementsAs List(1, 3, 5, 7) + + //Convert QueryRDD to DataFrame + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + + //Convert QueryRDD to DataFrame using RDDConverter + val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext) + + dataFrame.registerTempTable("Portfolio") + + val SQLResult = sqlContext.sql("SELECT * FROM Portfolio") + + //Verify the SQL query result + val sqlRS = SQLResult.collect().map(r => r(0).asInstanceOf[Portfolio].getType) + sqlRS should contain theSameElementsAs List("type1", "type2", "type3", "type4") + } + + test("Geode OQL query with more complex UDT and directly return DataFrame: Partitioned Region") { + complexUDTDataFrame("obj_obj_region") + } + + test("Geode OQL query with more complex UDT and directly return DataFrame: Replicated Region") { + complexUDTDataFrame("obj_obj_rep_region") + } + + private def complexUDTDataFrame(regionName: String) { + + initRegion(regionName) + + //Create DataFrame using Geode OQL + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + val dataFrame = sqlContext.geodeOQL(s"SELECT DISTINCT * FROM /$regionName WHERE status = 'active'") + dataFrame.registerTempTable("Portfolio") + + val SQLResult = sqlContext.sql("SELECT * FROM Portfolio") + + //Verify the SQL query result + val sqlRS = SQLResult.collect().map(r => r(0).asInstanceOf[Portfolio].getType) + sqlRS should contain theSameElementsAs List("type1", "type2", "type3", "type4") + } + + test("Geode OQL query with more complex UDT with Projection: Partitioned Region") { + queryComplexUDTProjection("obj_obj_region") + } + + test("Geode OQL query with more complex UDT with Projection: Replicated Region") { + queryComplexUDTProjection("obj_obj_rep_region") + } + + private def queryComplexUDTProjection(regionName: String) { + + initRegion(regionName) + + //Create QueryRDD using OQL + val OQLResult: QueryRDD[Object] = getQueryRDD[Object](s"""SELECT id, "type", positions, status FROM /$regionName WHERE status = 'active'""") + + //verify the QueryRDD + val oqlRS: Array[Int] = OQLResult.collect().map(si => si.asInstanceOf[StructImpl].getFieldValues.apply(0).asInstanceOf[Int]) + oqlRS should contain theSameElementsAs List(1, 3, 5, 7) + + //Convert QueryRDD to DataFrame + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + + //Convert QueryRDD to DataFrame using RDDConverter + val dataFrame = RDDConverter.queryRDDToDataFrame(OQLResult, sqlContext) + + dataFrame.registerTempTable("Portfolio") + + val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'") + + //Verify the SQL query result + val sqlRS = SQLResult.collect().map(r => r(0)) + sqlRS should contain theSameElementsAs List(3) + } + + test("Geode OQL query with more complex UDT with Projection and directly return DataFrame: Partitioned Region") { + queryComplexUDTProjectionDataFrame("obj_obj_region") + } + + test("Geode OQL query with more complex UDT with Projection and directly return DataFrame: Replicated Region") { + queryComplexUDTProjectionDataFrame("obj_obj_rep_region") + } + + private def queryComplexUDTProjectionDataFrame(regionName: String) { + + initRegion(regionName) + + //Create DataFrame using Geode OQL + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + val dataFrame = sqlContext.geodeOQL(s"""SELECT id, "type", positions, status FROM /$regionName WHERE status = 'active'""") + dataFrame.registerTempTable("Portfolio") + + val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'") + + //Verify the SQL query result + val sqlRS = SQLResult.collect().map(r => r(0)) + sqlRS should contain theSameElementsAs List(3) + } + + test("Geode OQL query with more complex UDT with nested Projection and directly return DataFrame: Partitioned Region") { + queryComplexUDTNestProjectionDataFrame("obj_obj_region") + } + + test("Geode OQL query with more complex UDT with nested Projection and directly return DataFrame: Replicated Region") { + queryComplexUDTNestProjectionDataFrame("obj_obj_rep_region") + } + + private def queryComplexUDTNestProjectionDataFrame(regionName: String) { + + initRegion(regionName) + + //Create DataFrame using Geode OQL + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + val dataFrame = sqlContext.geodeOQL(s"""SELECT r.id, r."type", r.positions, r.status FROM /$regionName r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'""") + dataFrame.registerTempTable("Portfolio") + + val SQLResult = sqlContext.sql("SELECT id, type FROM Portfolio where type = 'type3'") + + //Verify the SQL query result + val sqlRS = SQLResult.collect().map(r => r(0)) + sqlRS should contain theSameElementsAs List(3) + } + + test("Undefined instance deserialization: Partitioned Region") { + undefinedInstanceDeserialization("obj_obj_region") + } + + test("Undefined instance deserialization: Replicated Region") { + undefinedInstanceDeserialization("obj_obj_rep_region") + } + + private def undefinedInstanceDeserialization(regionName: String) { + + val conn = GeodeConnectionConf(sc.getConf).getConnection + val rgn: Region[Object, Object] = conn.getRegionProxy(regionName) + rgn.removeAll(rgn.keySetOnServer()) + + //Put some new data + rgn.put("1", "one") + + //Query some non-existent columns, which should return UNDEFINED + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + val dataFrame = sqlContext.geodeOQL(s"SELECT col100, col200 FROM /$regionName") + val col1 = dataFrame.first().apply(0) + val col2 = dataFrame.first().apply(1) + assert(col1 == QueryService.UNDEFINED) + assert(col2 == QueryService.UNDEFINED) + //Verify that col1 and col2 refer to the same Undefined object + assert(col1.asInstanceOf[AnyRef] eq col2.asInstanceOf[AnyRef]) + } + + test("RDD.saveToGeode") { + val regionName = "str_str_region" + // generate: Vector((1,11), (2,22), (3,33), (4,44), (5,55), (6,66)) + val data = (1 to 6).map(_.toString).map(e=> (e, e*2)) + val rdd = sc.parallelize(data) + rdd.saveToGeode(regionName) + + // verify + val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf) + val region: Region[String, String] = connConf.getConnection.getRegionProxy(regionName) + println("region key set on server: " + region.keySetOnServer()) + assert((1 to 6).map(_.toString).toSet == JavaConversions.asScalaSet(region.keySetOnServer())) + (1 to 6).map(_.toString).foreach(e => assert(e*2 == region.get(e))) + } + + // =========================================================== + // DStream.saveToGeode() functional tests + // =========================================================== + + test("Basic DStream test") { + import org.apache.spark.streaming.scheduler.{StreamingListenerBatchCompleted, StreamingListener} + import io.pivotal.geode.spark.connector.streaming._ + import org.apache.spark.streaming.ManualClockHelper + + class TestStreamListener extends StreamingListener { + var count = 0 + override def onBatchCompleted(batch: StreamingListenerBatchCompleted) = count += 1 + } + + def batchDuration = Seconds(1) + val ssc = new StreamingContext(sc, batchDuration) + val input = Seq(1 to 4, 5 to 8, 9 to 12) + val dstream = new TestInputDStream(ssc, input, 2) + dstream.saveToGeode[String, Int]("str_int_region", (e: Int) => (e.toString, e)) + try { + val listener = new TestStreamListener + ssc.addStreamingListener(listener) + ssc.start() + ManualClockHelper.addToTime(ssc, batchDuration.milliseconds * input.length) + while (listener.count < input.length) ssc.awaitTerminationOrTimeout(50) + } catch { + case e: Exception => e.printStackTrace(); throw e +// } finally { +// ssc.stop() + } + + val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf) + val conn = connConf.getConnection + val region: Region[String, Int] = conn.getRegionProxy("str_int_region") + + // verify geode region contents + println("region key set on server: " + region.keySetOnServer()) + assert((1 to 12).map(_.toString).toSet == JavaConversions.asScalaSet(region.keySetOnServer())) + (1 to 12).foreach(e => assert(e == region.get(e.toString))) + } +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/03e60a67/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/RDDJoinRegionIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/RDDJoinRegionIntegrationTest.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/RDDJoinRegionIntegrationTest.scala new file mode 100644 index 0000000..04d4198 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/RDDJoinRegionIntegrationTest.scala @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ittest.io.pivotal.geode.spark.connector + +import java.util.Properties + +import io.pivotal.geode.spark.connector._ +import org.apache.geode.cache.Region +import io.pivotal.geode.spark.connector.internal.DefaultGeodeConnectionManager +import ittest.io.pivotal.geode.spark.connector.testkit.GeodeCluster +import ittest.io.pivotal.geode.spark.connector.testkit.IOUtils +import org.apache.spark.{SparkContext, SparkConf} +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} +import java.util.{HashMap => JHashMap} + +class RDDJoinRegionIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GeodeCluster { + + var sc: SparkContext = null + val numServers = 3 + val numObjects = 1000 + + override def beforeAll() { + // start geode cluster, and spark context + val settings = new Properties() + settings.setProperty("cache-xml-file", "src/it/resources/test-retrieve-regions.xml") + settings.setProperty("num-of-servers", numServers.toString) + val locatorPort = GeodeCluster.start(settings) + + // start spark context in local mode + IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO", + "log4j.logger.io.pivotal.geode.spark.connector" -> "DEBUG") + val conf = new SparkConf() + .setAppName("RDDJoinRegionIntegrationTest") + .setMaster("local[2]") + .set(GeodeLocatorPropKey, s"localhost[$locatorPort]") + sc = new SparkContext(conf) + } + + override def afterAll() { + // stop connection, spark context, and geode cluster + DefaultGeodeConnectionManager.closeConnection(GeodeConnectionConf(sc.getConf)) + sc.stop() + GeodeCluster.stop() + } + +// def matchMaps[K,V](map1:Map[K,V], map2:Map[K,V]) = { +// assert(map1.size == map2.size) +// map1.foreach(e => { +// assert(map2.contains(e._1)) +// assert (e._2 == map2.get(e._1).get) +// }) +// } + + // -------------------------------------------------------------------------------------------- + // PairRDD.joinGeodeRegion[K2 <: K, V2](regionPath, connConf): GeodeJoinRDD[(K, V), K, V2] + // -------------------------------------------------------------------------------------------- + + test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K, V2], replicated region", JoinTest) { + verifyPairRDDJoinRegionWithSameKeyType("rr_str_int_region") + } + + test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K, V2], partitioned region", JoinTest) { + verifyPairRDDJoinRegionWithSameKeyType("pr_str_int_region") + } + + test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K, V2], partitioned redundant region", JoinTest) { + verifyPairRDDJoinRegionWithSameKeyType("pr_r_str_int_region") + } + + def verifyPairRDDJoinRegionWithSameKeyType(regionPath: String): Unit = { + val entriesMap: JHashMap[String, Int] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("k_" + i, i)) + + val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf) + val conn = connConf.getConnection + val rgn: Region[String, Int] = conn.getRegionProxy(regionPath) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(entriesMap) + + val data = (-5 until 50).map(x => ("k_" + x, x*2)) + val rdd = sc.parallelize(data) + + val rdd2 = rdd.joinGeodeRegion[String, Int](regionPath, connConf) + val rdd2Content = rdd2.collect() + + val expectedMap = (0 until 50).map(i => ((s"k_$i", i*2), i)).toMap + // matchMaps[(String, Int), Int](expectedMap, rdd2Content.toMap) + assert(expectedMap == rdd2Content.toMap) + } + + // ------------------------------------------------------------------------------------------------------ + // PairRDD.joinGeodeRegion[K2, V2](regionPath, ((K, V)) => K2, connConf): GeodeJoinRDD[(K, V), K2, V2] + // ------------------------------------------------------------------------------------------------------- + + test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K2, V2], replicated region", JoinTest) { + verifyPairRDDJoinRegionWithDiffKeyType("rr_str_int_region") + } + + test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K2, V2], partitioned region", JoinTest) { + verifyPairRDDJoinRegionWithDiffKeyType("pr_str_int_region") + } + + test("PairRDD.joinGeodeRegion: RDD[K, V] with Region[K2, V2], partitioned redundant region", JoinTest) { + verifyPairRDDJoinRegionWithDiffKeyType("pr_r_str_int_region") + } + + def verifyPairRDDJoinRegionWithDiffKeyType(regionPath: String): Unit = { + val entriesMap: JHashMap[String, Int] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("k_" + i, i)) + + val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf) + val conn = connConf.getConnection + val rgn: Region[String, Int] = conn.getRegionProxy(regionPath) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(entriesMap) + + val data = (-5 until 50).map(x => (x, x*2)) + val rdd = sc.parallelize(data) + + val func :((Int, Int)) => String = pair => s"k_${pair._1}" + + val rdd2 = rdd.joinGeodeRegion[String, Int](regionPath, func /*, connConf*/) + val rdd2Content = rdd2.collect() + + val expectedMap = (0 until 50).map(i => ((i, i*2), i)).toMap + // matchMaps[(Int, Int), Int](expectedMap, rdd2Content.toMap) + assert(expectedMap == rdd2Content.toMap) + } + + // ------------------------------------------------------------------------------------------------ + // PairRDD.outerJoinGeodeRegion[K2 <: K, V2](regionPath, connConf): GeodeJoinRDD[(K, V), K, V2] + // ------------------------------------------------------------------------------------------------ + + test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K, V2], replicated region", OuterJoinTest) { + verifyPairRDDOuterJoinRegionWithSameKeyType("rr_str_int_region") + } + + test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K, V2], partitioned region", OuterJoinTest) { + verifyPairRDDOuterJoinRegionWithSameKeyType("pr_str_int_region") + } + + test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K, V2], partitioned redundant region", OuterJoinTest) { + verifyPairRDDOuterJoinRegionWithSameKeyType("pr_r_str_int_region") + } + + def verifyPairRDDOuterJoinRegionWithSameKeyType(regionPath: String): Unit = { + val entriesMap: JHashMap[String, Int] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("k_" + i, i)) + + val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf) + val conn = connConf.getConnection + val rgn: Region[String, Int] = conn.getRegionProxy(regionPath) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(entriesMap) + + val data = (-5 until 50).map(x => ("k_" + x, x*2)) + val rdd = sc.parallelize(data) + + val rdd2 = rdd.outerJoinGeodeRegion[String, Int](regionPath /*, connConf*/) + val rdd2Content = rdd2.collect() + + val expectedMap = (-5 until 50).map { + i => if (i < 0) ((s"k_$i", i * 2), None) + else ((s"k_$i", i*2), Some(i))}.toMap + // matchMaps[(String, Int), Option[Int]](expectedMap, rdd2Content.toMap) + assert(expectedMap == rdd2Content.toMap) + } + + // ------------------------------------------------------------------------------------------------------ + // PairRDD.joinGeodeRegion[K2, V2](regionPath, ((K, V)) => K2, connConf): GeodeJoinRDD[(K, V), K2, V2] + // ------------------------------------------------------------------------------------------------------- + + test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K2, V2], replicated region", OuterJoinTest) { + verifyPairRDDOuterJoinRegionWithDiffKeyType("rr_str_int_region") + } + + test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K2, V2], partitioned region", OuterJoinTest) { + verifyPairRDDOuterJoinRegionWithDiffKeyType("pr_str_int_region") + } + + test("PairRDD.outerJoinGeodeRegion: RDD[K, V] with Region[K2, V2], partitioned redundant region", OuterJoinTest) { + verifyPairRDDOuterJoinRegionWithDiffKeyType("pr_r_str_int_region") + } + + def verifyPairRDDOuterJoinRegionWithDiffKeyType(regionPath: String): Unit = { + val entriesMap: JHashMap[String, Int] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("k_" + i, i)) + + val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf) + val conn = connConf.getConnection + val rgn: Region[String, Int] = conn.getRegionProxy(regionPath) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(entriesMap) + + val data = (-5 until 50).map(x => (x, x*2)) + val rdd = sc.parallelize(data) + + val func :((Int, Int)) => String = pair => s"k_${pair._1}" + + val rdd2 = rdd.outerJoinGeodeRegion[String, Int](regionPath, func, connConf) + val rdd2Content = rdd2.collect() + + val expectedMap = (-5 until 50).map { + i => if (i < 0) ((i, i * 2), None) + else ((i, i*2), Some(i))}.toMap + // matchMaps[(Int, Int), Option[Int]](expectedMap, rdd2Content.toMap) + assert(expectedMap == rdd2Content.toMap) + } + + // -------------------------------------------------------------------------------------------- + // RDD.joinGeodeRegion[K, V](regionPath, T => K, connConf): GeodeJoinRDD[T, K, V] + // -------------------------------------------------------------------------------------------- + + test("RDD.joinGeodeRegion: RDD[T] with Region[K, V], replicated region", JoinTest) { + verifyRDDJoinRegion("rr_str_int_region") + } + + test("RDD.joinGeodeRegion: RDD[T] with Region[K, V], partitioned region", JoinTest) { + verifyRDDJoinRegion("pr_str_int_region") + } + + test("RDD.joinGeodeRegion: RDD[T] with Region[K, V], partitioned redundant region", JoinTest) { + verifyRDDJoinRegion("pr_r_str_int_region") + } + + def verifyRDDJoinRegion(regionPath: String): Unit = { + val entriesMap: JHashMap[String, Int] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("k_" + i, i)) + + val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf) + val conn = connConf.getConnection + val rgn: Region[String, Int] = conn.getRegionProxy(regionPath) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(entriesMap) + + val data = (-5 until 50).map(x => s"k_$x") + val rdd = sc.parallelize(data) + + val rdd2 = rdd.joinGeodeRegion[String, Int](regionPath, x => x, connConf) + val rdd2Content = rdd2.collect() + + val expectedMap = (0 until 50).map(i => (s"k_$i", i)).toMap + // matchMaps[String, Int](expectedMap, rdd2Content.toMap) + assert(expectedMap == rdd2Content.toMap) + } + + // -------------------------------------------------------------------------------------------- + // RDD.outerJoinGeodeRegion[K, V](regionPath, T => K, connConf): GeodeJoinRDD[T, K, V] + // -------------------------------------------------------------------------------------------- + + test("RDD.outerJoinGeodeRegion: RDD[T] with Region[K, V], replicated region", OnlyTest) { + verifyRDDOuterJoinRegion("rr_str_int_region") + } + + test("RDD.outerJoinGeodeRegion: RDD[T] with Region[K, V], partitioned region", OnlyTest) { + verifyRDDOuterJoinRegion("pr_str_int_region") + } + + test("RDD.outerJoinGeodeRegion: RDD[T] with Region[K, V], partitioned redundant region", OnlyTest) { + verifyRDDOuterJoinRegion("pr_r_str_int_region") + } + + def verifyRDDOuterJoinRegion(regionPath: String): Unit = { + val entriesMap: JHashMap[String, Int] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("k_" + i, i)) + + val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf) + val conn = connConf.getConnection + val rgn: Region[String, Int] = conn.getRegionProxy(regionPath) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(entriesMap) + + val data = (-5 until 50).map(x => s"k_$x") + val rdd = sc.parallelize(data) + + val rdd2 = rdd.outerJoinGeodeRegion[String, Int](regionPath, x => x /*, connConf */) + val rdd2Content = rdd2.collect() + + val expectedMap = (-5 until 50).map { + i => if (i < 0) (s"k_$i", None) + else (s"k_$i", Some(i))}.toMap + // matchMaps[String, Option[Int]](expectedMap, rdd2Content.toMap) + assert(expectedMap == rdd2Content.toMap) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/03e60a67/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/RetrieveRegionIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/RetrieveRegionIntegrationTest.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/RetrieveRegionIntegrationTest.scala new file mode 100644 index 0000000..93e7cbf --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/RetrieveRegionIntegrationTest.scala @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ittest.io.pivotal.geode.spark.connector + +import java.util.Properties + +import io.pivotal.geode.spark.connector._ +import org.apache.geode.cache.Region +import io.pivotal.geode.spark.connector.internal.DefaultGeodeConnectionManager +import ittest.io.pivotal.geode.spark.connector.testkit.GeodeCluster +import ittest.io.pivotal.geode.spark.connector.testkit.IOUtils +import org.apache.spark.{SparkContext, SparkConf} +import org.scalatest.{Tag, BeforeAndAfterAll, FunSuite, Matchers} +import java.util.{HashMap => JHashMap} + + +class RetrieveRegionIntegrationTest extends FunSuite with Matchers with BeforeAndAfterAll with GeodeCluster { + + var sc: SparkContext = null + val numServers = 4 + val numObjects = 1000 + + override def beforeAll() { + // start geode cluster, and spark context + val settings = new Properties() + settings.setProperty("cache-xml-file", "src/it/resources/test-retrieve-regions.xml") + settings.setProperty("num-of-servers", numServers.toString) + val locatorPort = GeodeCluster.start(settings) + + // start spark context in local mode + IOUtils.configTestLog4j("ERROR", "log4j.logger.org.apache.spark" -> "INFO", + "log4j.logger.io.pivotal.geode.spark.connector" -> "DEBUG") + val conf = new SparkConf() + .setAppName("RetrieveRegionIntegrationTest") + .setMaster("local[2]") + .set(GeodeLocatorPropKey, s"localhost[$locatorPort]") + sc = new SparkContext(conf) + } + + override def afterAll() { + // stop connection, spark context, and geode cluster + DefaultGeodeConnectionManager.closeConnection(GeodeConnectionConf(sc.getConf)) + sc.stop() + GeodeCluster.stop() + } + + def executeTest[K,V](regionName:String, numObjects:Int, entriesMap:java.util.Map[K,V]) = { + //Populate some data in the region + val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf) + val conn = connConf.getConnection + val rgn: Region[K, V] = conn.getRegionProxy(regionName) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(entriesMap) + verifyRetrieveRegion[K,V](regionName, entriesMap) + } + + def verifyRetrieveRegion[K,V](regionName:String, entriesMap:java.util.Map[K,V]) = { + val rdd = sc.geodeRegion(regionName) + val collectedObjs = rdd.collect() + collectedObjs should have length entriesMap.size + import scala.collection.JavaConverters._ + matchMaps[K,V](entriesMap.asScala.toMap, collectedObjs.toMap) + } + + def matchMaps[K,V](map1:Map[K,V], map2:Map[K,V]) = { + assert(map1.size == map2.size) + map1.foreach(e => { + assert(map2.contains(e._1)) + assert (e._2 == map2.get(e._1).get) + } + ) + } + + //Retrieve region for Partitioned Region where some nodes are empty (empty iterator) + //This test has to run first...the rest of the tests always use the same num objects + test("Retrieve Region for PR where some nodes are empty (Empty Iterator)") { + val numObjects = numServers - 1 + val entriesMap:JHashMap[String, Int] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("key_" + i, i)) + executeTest[String, Int]("rr_str_int_region", numObjects, entriesMap) + } + + //Test for retrieving from region containing string key and int value + def verifyRetrieveStringStringRegion(regionName:String) = { + val entriesMap:JHashMap[String, String] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("key_" + i, "value_" + i)) + executeTest[String, String](regionName, numObjects, entriesMap) + } + + test("Retrieve Region with replicate redundant string string") { + verifyRetrieveStringStringRegion("rr_obj_obj_region") + } + + test("Retrieve Region with partitioned string string") { + verifyRetrieveStringStringRegion("pr_obj_obj_region") + } + + test("Retrieve Region with partitioned redundant string string") { + verifyRetrieveStringStringRegion("pr_r_obj_obj_region") + } + + + //Test for retrieving from region containing string key and string value + def verifyRetrieveStringIntRegion(regionName:String) = { + val entriesMap:JHashMap[String, Int] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("key_" + i, i)) + executeTest[String, Int](regionName, numObjects, entriesMap) + } + + test("Retrieve Region with replicate string int region") { + verifyRetrieveStringIntRegion("rr_str_int_region") + } + + test("Retrieve Region with partitioned string int region") { + verifyRetrieveStringIntRegion("pr_str_int_region") + } + + test("Retrieve Region with partitioned redundant string int region") { + verifyRetrieveStringIntRegion("pr_r_str_int_region") + } + + //Tests for retrieving from region containing string key and object value + def verifyRetrieveStringObjectRegion(regionName:String) = { + val entriesMap:JHashMap[String, Object] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("key_" + i, new Employee("ename" + i, i))) + executeTest[String, Object](regionName, numObjects, entriesMap) + } + + test("Retrieve Region with replicate string obj") { + verifyRetrieveStringObjectRegion("rr_obj_obj_region") + } + + test("Retrieve Region with partitioned string obj") { + verifyRetrieveStringObjectRegion("pr_obj_obj_region") + } + + test("Retrieve Region with partitioned redundant string obj") { + verifyRetrieveStringObjectRegion("pr_r_obj_obj_region") + } + + //Test for retrieving from region containing string key and map value + def verifyRetrieveStringMapRegion(regionName:String) = { + val entriesMap:JHashMap[String,JHashMap[String,String]] = new JHashMap() + (0 until numObjects).map(i => { + val hashMap:JHashMap[String, String] = new JHashMap() + hashMap.put("mapKey:" + i, "mapValue:" + i) + entriesMap.put("key_" + i, hashMap) + }) + executeTest(regionName, numObjects, entriesMap) + } + + test("Retrieve Region with replicate string map region") { + verifyRetrieveStringMapRegion("rr_obj_obj_region") + } + + test("Retrieve Region with partitioned string map region") { + verifyRetrieveStringMapRegion("pr_obj_obj_region") + } + + test("Retrieve Region with partitioned redundant string map region") { + verifyRetrieveStringMapRegion("pr_r_obj_obj_region") + } + + //Test and helpers specific for retrieving from region containing string key and byte[] value + def executeTestWithByteArrayValues[K](regionName:String, numObjects:Int, entriesMap:java.util.Map[K,Array[Byte]]) = { + //Populate some data in the region + val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf) + val conn = connConf.getConnection + val rgn: Region[K, Array[Byte]] = conn.getRegionProxy(regionName) + rgn.putAll(entriesMap) + verifyRetrieveRegionWithByteArrayValues[K](regionName, entriesMap) + } + + def verifyRetrieveRegionWithByteArrayValues[K](regionName:String, entriesMap:java.util.Map[K,Array[Byte]]) = { + val rdd = sc.geodeRegion(regionName) + val collectedObjs = rdd.collect() + collectedObjs should have length entriesMap.size + import scala.collection.JavaConverters._ + matchByteArrayMaps[K](entriesMap.asScala.toMap, collectedObjs.toMap) + } + + def matchByteArrayMaps[K](map1:Map[K,Array[Byte]], map2:Map[K,Array[Byte]]) = { + map1.foreach(e => { + assert(map2.contains(e._1)) + assert (java.util.Arrays.equals(e._2, map2.get(e._1).get)) + } + ) + assert(map1.size == map2.size) + + } + + def verifyRetrieveStringByteArrayRegion(regionName:String) = { + val entriesMap:JHashMap[String, Array[Byte]] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("key_" + i, Array[Byte](192.toByte, 168.toByte, 0, i.toByte))) + executeTestWithByteArrayValues[String](regionName, numObjects, entriesMap) + } + + test("Retrieve Region with replicate region string byte[] region") { + verifyRetrieveStringByteArrayRegion("rr_obj_obj_region") + } + + test("Retrieve Region with partition region string byte[] region") { + verifyRetrieveStringByteArrayRegion("pr_obj_obj_region") + } + + test("Retrieve Region with partition redundant region string byte[] region") { + verifyRetrieveStringByteArrayRegion("pr_r_obj_obj_region") + } + + test("Retrieve Region with where clause on partitioned redundant region", FilterTest) { + verifyRetrieveRegionWithWhereClause("pr_r_str_int_region") + } + + test("Retrieve Region with where clause on partitioned region", FilterTest) { + verifyRetrieveRegionWithWhereClause("pr_str_int_region") + } + + test("Retrieve Region with where clause on replicated region", FilterTest) { + verifyRetrieveRegionWithWhereClause("rr_str_int_region") + } + + def verifyRetrieveRegionWithWhereClause(regionPath: String): Unit = { + val entriesMap: JHashMap[String, Int] = new JHashMap() + (0 until numObjects).map(i => entriesMap.put("key_" + i, i)) + + val connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf) + val conn = connConf.getConnection + val rgn: Region[String, Int] = conn.getRegionProxy(regionPath) + rgn.removeAll(rgn.keySetOnServer()) + rgn.putAll(entriesMap) + + val rdd = sc.geodeRegion(regionPath).where("value.intValue() < 50") + val expectedMap = (0 until 50).map(i => (s"key_$i", i)).toMap + val collectedObjs = rdd.collect() + // collectedObjs should have length expectedMap.size + matchMaps[String, Int](expectedMap, collectedObjs.toMap) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/03e60a67/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/package.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/package.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/package.scala new file mode 100644 index 0000000..b8571d8 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/package.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ittest.io.pivotal.geode.spark + +import org.scalatest.Tag + +package object connector { + + object OnlyTest extends Tag("OnlyTest") + object FetchDataTest extends Tag("FetchDateTest") + object FilterTest extends Tag("FilterTest") + object JoinTest extends Tag("JoinTest") + object OuterJoinTest extends Tag("OuterJoinTest") + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/03e60a67/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/GeodeCluster.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/GeodeCluster.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/GeodeCluster.scala new file mode 100644 index 0000000..18b2fd7 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/GeodeCluster.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ittest.io.pivotal.geode.spark.connector.testkit + +import java.util.Properties + +trait GeodeCluster { + def startGeodeCluster(settings: Properties): Int = { + println("=== GeodeCluster start()") + GeodeCluster.start(settings) + } +} + +object GeodeCluster { + private var geode: Option[GeodeRunner] = None + + def start(settings: Properties): Int = { + geode.map(_.stopGeodeCluster()) // Clean up any old running Geode instances + val runner = new GeodeRunner(settings) + geode = Some(runner) + runner.getLocatorPort + } + + def stop(): Unit = { + println("=== GeodeCluster shutdown: " + geode.toString) + geode match { + case None => println("Nothing to shutdown.") + case Some(runner) => runner.stopGeodeCluster() + } + geode = None + println("=== GeodeCluster shutdown finished.") + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/03e60a67/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/GeodeRunner.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/GeodeRunner.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/GeodeRunner.scala new file mode 100644 index 0000000..725a012 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/GeodeRunner.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ittest.io.pivotal.geode.spark.connector.testkit + +import java.io.{IOException, File} +import java.net.InetAddress +import java.util.Properties +import org.apache.commons.httpclient.HttpClient +import org.apache.commons.io.FileUtils +import org.apache.commons.io.filefilter.IOFileFilter + +/** +* A class that manages Geode locator and servers. Uses gfsh to +* start and stop the locator and servers. +*/ +class GeodeRunner(settings: Properties) { + val gfshCmd = new File(getCurrentDirectory, "../../geode-assembly/build/install/apache-geode/bin/gfsh").toString + val cacheXMLFile = settings.get("cache-xml-file") + val numServers: Int = settings.get("num-of-servers").asInstanceOf[String].toInt + val cwd = new File(".").getAbsolutePath + val geodeFunctionsTargetDir = new File("../geode-functions/target") + val testroot = "target/testgeode" + val classpath = new File(cwd, "target/scala-2.10/it-classes/") + val locatorPort = startGeodeCluster(numServers) + + def getLocatorPort: Int = locatorPort + + private def getCurrentDirectory = new File( "." ).getCanonicalPath + + private def startGeodeCluster(numServers: Int): Int = { + //ports(0) for Geode locator, the other ports are for Geode servers + val ports: Seq[Int] = IOUtils.getRandomAvailableTCPPorts(2 + numServers) + startGeodeLocator(ports(0), ports(1)) + startGeodeServers(ports(0), ports.drop(2)) + registerFunctions(ports(1)) + ports(0) + } + + private def startGeodeLocator(locatorPort: Int, jmxHttpPort:Int) { + println(s"=== GeodeRunner: starting locator on port $locatorPort") + val locatorDir = new File(cwd, s"$testroot/locator") + if (locatorDir.exists()) + FileUtils.deleteDirectory(locatorDir) + IOUtils.mkdir(locatorDir) + new ProcessBuilder() + .command(gfshCmd, "start", "locator", + "--name=locator", + s"--dir=$locatorDir", + s"--port=$locatorPort", + s"--J=-Dgemfire.jmx-manager-http-port=$jmxHttpPort") + .inheritIO() + .start() + + // Wait 30 seconds for locator to start + println(s"=== GeodeRunner: waiting for locator on port $locatorPort") + if (!IOUtils.waitForPortOpen(InetAddress.getByName("localhost"), locatorPort, 30000)) + throw new IOException("Failed to start Geode locator.") + println(s"=== GeodeRunner: done waiting for locator on port $locatorPort") + } + + private def startGeodeServers(locatorPort: Int, serverPorts: Seq[Int]) { + val procs = for (i <- 0 until serverPorts.length) yield { + println(s"=== GeodeRunner: starting server${i+1} with clientPort ${serverPorts(i)}") + val serverDir = new File(cwd, s"$testroot/server${i+1}") + if (serverDir.exists()) + FileUtils.deleteDirectory(serverDir) + IOUtils.mkdir(serverDir) + new ProcessBuilder() + .command(gfshCmd, "start", "server", + s"--name=server${i+1}", + s"--locators=localhost[$locatorPort]", + s"--bind-address=localhost", + s"--server-port=${serverPorts(i)}", + s"--dir=$serverDir", + s"--cache-xml-file=$cacheXMLFile", + s"--classpath=$classpath") + .inheritIO() + .start() + } + procs.foreach(p => p.waitFor) + println(s"All $serverPorts.length servers have been started") + } + + private def registerFunctions(jmxHttpPort:Int) { + import scala.collection.JavaConversions._ + FileUtils.listFiles(geodeFunctionsTargetDir, fileFilter, dirFilter).foreach{ f => registerFunction(jmxHttpPort, f)} + } + + def fileFilter = new IOFileFilter { + def accept (file: File) = file.getName.endsWith(".jar") && file.getName.startsWith("geode-functions") + def accept (dir: File, name: String) = name.endsWith(".jar") && name.startsWith("geode-functions") + } + + def dirFilter = new IOFileFilter { + def accept (file: File) = file.getName.startsWith("scala") + def accept (dir: File, name: String) = name.startsWith("scala") + } + + private def registerFunction(jmxHttpPort:Int, jar:File) { + println("Deploying:" + jar.getName) + import io.pivotal.geode.spark.connector.GeodeFunctionDeployer + val deployer = new GeodeFunctionDeployer(new HttpClient()) + deployer.deploy("localhost", jmxHttpPort, jar) + } + + def stopGeodeCluster(): Unit = { + stopGeodeServers(numServers) + stopGeodeLocator() + if (!IOUtils.waitForPortClose(InetAddress.getByName("localhost"), getLocatorPort, 30000)) + throw new IOException(s"Failed to stop Geode locator at port $getLocatorPort.") + println(s"Successfully stop Geode locator at port $getLocatorPort.") + } + + private def stopGeodeLocator() { + println(s"=== GeodeRunner: stop locator") + val p = new ProcessBuilder() + .inheritIO() + .command(gfshCmd, "stop", "locator", s"--dir=$testroot/locator") + .start() + p.waitFor() + } + + private def stopGeodeServers(numServers: Int) { + val procs = for (i <-1 to numServers) yield { + println(s"=== GeodeRunner: stop server $i.") + new ProcessBuilder() + .inheritIO() + .command(gfshCmd, "stop", "server", s"--dir=$testroot/server$i") + .start() + } + procs.foreach(p => p.waitFor()) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/03e60a67/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/IOUtils.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/IOUtils.scala b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/IOUtils.scala new file mode 100644 index 0000000..6d667e9 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/it/scala/ittest/org/apache/geode/spark/connector/testkit/IOUtils.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ittest.io.pivotal.geode.spark.connector.testkit + +import java.io.{File, IOException} +import java.net.{InetAddress, Socket} +import org.apache.geode.internal.AvailablePort +import scala.util.Try +import org.apache.log4j.PropertyConfigurator +import java.util.Properties + +object IOUtils { + + /** Makes a new directory or throws an `IOException` if it cannot be made */ + def mkdir(dir: File): File = { + if (!dir.mkdirs()) + throw new IOException(s"Could not create dir $dir") + dir + } + + private def socketPortProb(host: InetAddress, port: Int) = Iterator.continually { + Try { + Thread.sleep(100) + new Socket(host, port).close() + } + } + + /** + * Waits until a port at the given address is open or timeout passes. + * @return true if managed to connect to the port, false if timeout happened first + */ + def waitForPortOpen(host: InetAddress, port: Int, timeout: Long): Boolean = { + val startTime = System.currentTimeMillis() + socketPortProb(host, port) + .dropWhile(p => p.isFailure && System.currentTimeMillis() - startTime < timeout) + .next() + .isSuccess + } + + /** + * Waits until a port at the given address is close or timeout passes. + * @return true if host:port is un-connect-able, false if timeout happened first + */ + def waitForPortClose(host: InetAddress, port: Int, timeout: Long): Boolean = { + val startTime = System.currentTimeMillis() + socketPortProb(host, port) + .dropWhile(p => p.isSuccess && System.currentTimeMillis() - startTime < timeout) + .next() + .isFailure + } + + /** + * Returns array of unique randomly available tcp ports of specified count. + */ + def getRandomAvailableTCPPorts(count: Int): Seq[Int] = + (0 until count).map(x => AvailablePort.getRandomAvailablePortKeeper(AvailablePort.SOCKET)) + .map{x => x.release(); x.getPort}.toArray + + /** + * config a log4j properties used for integration tests + */ + def configTestLog4j(level: String, props: (String, String)*): Unit = { + val pro = new Properties() + props.foreach(p => pro.put(p._1, p._2)) + configTestLog4j(level, pro) + } + + def configTestLog4j(level: String, props: Properties): Unit = { + val pro = new Properties() + pro.put("log4j.rootLogger", s"$level, console") + pro.put("log4j.appender.console", "org.apache.log4j.ConsoleAppender") + pro.put("log4j.appender.console.target", "System.err") + pro.put("log4j.appender.console.layout", "org.apache.log4j.PatternLayout") + pro.put("log4j.appender.console.layout.ConversionPattern", + "%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n") + pro.putAll(props) + PropertyConfigurator.configure(pro) + + } +}
