GEODE-37 change package name from io.pivotal.geode (for 
./geode-spark-connector/src/test/scala/unittest/io/pivotal)to org.apache.geode 
for(to ./geode-spark-connector/src/test/scala/unittest/org/apache)


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/1fc4e6e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/1fc4e6e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/1fc4e6e0

Branch: refs/heads/develop
Commit: 1fc4e6e01a3a5526f1f0d11e4c23b05536b99fdf
Parents: 97658f0
Author: Hitesh Khamesra <hkhame...@pivotal.io>
Authored: Tue Sep 20 15:44:10 2016 -0700
Committer: Hitesh Khamesra <hkhame...@pivotal.io>
Committed: Tue Sep 20 16:01:02 2016 -0700

----------------------------------------------------------------------
 .../connector/ConnectorImplicitsTest.scala      |  50 -----
 .../connector/GeodeConnectionConfTest.scala     | 100 ----------
 .../connector/GeodeDStreamFunctionsTest.scala   |  79 --------
 .../spark/connector/GeodeRDDFunctionsTest.scala | 139 --------------
 .../spark/connector/LocatorHelperTest.scala     | 168 ----------------
 .../connector/rdd/GeodeRDDPartitionerTest.scala | 190 -------------------
 .../connector/rdd/GeodeRegionRDDTest.scala      | 117 ------------
 .../connector/ConnectorImplicitsTest.scala      |  50 +++++
 .../connector/GeodeConnectionConfTest.scala     | 100 ++++++++++
 .../connector/GeodeDStreamFunctionsTest.scala   |  79 ++++++++
 .../spark/connector/GeodeRDDFunctionsTest.scala | 139 ++++++++++++++
 .../spark/connector/LocatorHelperTest.scala     | 168 ++++++++++++++++
 .../connector/rdd/GeodeRDDPartitionerTest.scala | 190 +++++++++++++++++++
 .../connector/rdd/GeodeRegionRDDTest.scala      | 117 ++++++++++++
 14 files changed, 843 insertions(+), 843 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1fc4e6e0/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/ConnectorImplicitsTest.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/ConnectorImplicitsTest.scala
 
b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/ConnectorImplicitsTest.scala
deleted file mode 100644
index b0464cc..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/ConnectorImplicitsTest.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package unittest.io.pivotal.geode.spark.connector
-
-import io.pivotal.geode.spark.connector._
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-import org.scalatest.FunSuite
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.Matchers
-
-class ConnectorImplicitsTest extends FunSuite with Matchers with MockitoSugar {
-
-  test("implicit map2Properties") {
-    verifyProperties(Map.empty)
-    verifyProperties(Map("One" -> "1", "Two" -> "2", "Three" ->"3"))
-  }
-  
-  def verifyProperties(map: Map[String, String]): Unit = {
-    val props: java.util.Properties = map
-    assert(props.size() == map.size)
-    map.foreach(p => assert(props.getProperty(p._1) == p._2))    
-  }
-
-  test("Test Implicit SparkContext Conversion") {
-    val mockSparkContext = mock[SparkContext]
-    val gfscf: GeodeSparkContextFunctions = mockSparkContext
-    assert(gfscf.isInstanceOf[GeodeSparkContextFunctions])
-  }
-
-  test("Test Implicit SQLContext Conversion") {
-    val mockSQLContext = mock[SQLContext]
-    val gfscf: GeodeSQLContextFunctions = mockSQLContext
-    assert(gfscf.isInstanceOf[GeodeSQLContextFunctions])
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1fc4e6e0/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeConnectionConfTest.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeConnectionConfTest.scala
 
b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeConnectionConfTest.scala
deleted file mode 100644
index a3076f4..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeConnectionConfTest.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package unittest.io.pivotal.geode.spark.connector
-
-import org.apache.spark.SparkConf
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, FunSuite}
-import io.pivotal.geode.spark.connector._
-
-class GeodeConnectionConfTest extends FunSuite with Matchers with MockitoSugar 
{
-
-  test("apply(SparkConf) w/ GeodeLocator property and empty geodeProps") {
-    val (host1, port1) = ("host1", 1234)
-    val (host2, port2) = ("host2", 5678)
-    val conf = new SparkConf().set(GeodeLocatorPropKey, 
s"$host1[$port1],$host2[$port2]")
-    val connConf = GeodeConnectionConf(conf)
-    assert(connConf.locators == Seq((host1, port1),(host2, port2)))
-    assert(connConf.geodeProps.isEmpty)
-  }
-  
-  test("apply(SparkConf) w/ GeodeLocator property and geode properties") {
-    val (host1, port1) = ("host1", 1234)
-    val (host2, port2) = ("host2", 5678)
-    val (propK1, propV1) = ("ack-severe-alert-threshold", "1")
-    val (propK2, propV2) = ("ack-wait-threshold", "10")
-    val conf = new SparkConf().set(GeodeLocatorPropKey, 
s"$host1[$port1],$host2[$port2]")
-                              .set(s"spark.geode.$propK1", 
propV1).set(s"spark.geode.$propK2", propV2)
-    val connConf = GeodeConnectionConf(conf)
-    assert(connConf.locators == Seq((host1, port1),(host2, port2)))
-    assert(connConf.geodeProps == Map(propK1 -> propV1, propK2 -> propV2))
-  }
-
-  test("apply(SparkConf) w/o GeodeLocator property") {
-    intercept[RuntimeException] { GeodeConnectionConf(new SparkConf()) }
-  }
-
-  test("apply(SparkConf) w/ invalid GeodeLocator property") {
-    val conf = new SparkConf().set(GeodeLocatorPropKey, "local^host:1234")
-    intercept[Exception] { GeodeConnectionConf(conf) }
-  }
-
-  test("apply(locatorStr, geodeProps) w/ valid locatorStr and non geodeProps") 
{
-    val (host1, port1) = ("host1", 1234)
-    val connConf = GeodeConnectionConf(s"$host1:$port1")
-    assert(connConf.locators == Seq((host1, port1)))
-    assert(connConf.geodeProps.isEmpty)
-  }
-
-  test("apply(locatorStr, geodeProps) w/ valid locatorStr and non-empty 
geodeProps") {
-    val (host1, port1) = ("host1", 1234)
-    val (host2, port2) = ("host2", 5678)
-    val (propK1, propV1) = ("ack-severe-alert-threshold", "1")
-    val (propK2, propV2) = ("ack-wait-threshold", "10")
-    val props = Map(propK1 -> propV1, propK2 -> propV2)
-    val connConf = GeodeConnectionConf(s"$host1:$port1,$host2:$port2", props)
-    assert(connConf.locators == Seq((host1, port1),(host2, port2)))
-    assert(connConf.geodeProps == props)
-  }
-
-  test("apply(locatorStr, geodeProps) w/ invalid locatorStr") {
-    intercept[Exception] { GeodeConnectionConf("local~host:4321") }
-  }
-
-  test("constructor w/ empty (host,port) pairs") {
-    intercept[IllegalArgumentException] { new GeodeConnectionConf(Seq.empty) }
-  }
-
-  test("getConnection() normal") {
-    implicit val mockFactory = mock[GeodeConnectionManager]
-    val mockConnection = mock[GeodeConnection]
-    
when(mockFactory.getConnection(org.mockito.Matchers.any[GeodeConnectionConf])).thenReturn(mockConnection)
-    val connConf = GeodeConnectionConf("localhost:1234")
-    assert(connConf.getConnection == mockConnection)
-    verify(mockFactory).getConnection(connConf)
-  }
-
-  test("getConnection() failure") {
-    implicit val mockFactory = mock[GeodeConnectionManager]
-    
when(mockFactory.getConnection(org.mockito.Matchers.any[GeodeConnectionConf])).thenThrow(new
 RuntimeException)
-    val connConf = GeodeConnectionConf("localhost:1234")
-    intercept[RuntimeException] { connConf.getConnection }
-    verify(mockFactory).getConnection(connConf)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1fc4e6e0/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeDStreamFunctionsTest.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeDStreamFunctionsTest.scala
 
b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeDStreamFunctionsTest.scala
deleted file mode 100644
index d671722..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeDStreamFunctionsTest.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package unittest.io.pivotal.geode.spark.connector
-
-import org.apache.geode.cache.Region
-import io.pivotal.geode.spark.connector.{GeodeConnection, GeodeConnectionConf}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.dstream.DStream
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, FunSuite}
-import org.mockito.Matchers.{eq => mockEq, any => mockAny}
-
-import scala.reflect.ClassTag
-
-class GeodeDStreamFunctionsTest extends FunSuite with Matchers with 
MockitoSugar {
-
-  test("test GeodePairDStreamFunctions Implicit") {
-    import io.pivotal.geode.spark.connector.streaming._
-    val mockDStream = mock[DStream[(Int, String)]]
-    // the implicit make the following line valid
-    val pairDStream: GeodePairDStreamFunctions[Int, String] = mockDStream
-    pairDStream shouldBe a[GeodePairDStreamFunctions[_, _]]
-  }
-
-  test("test GeodeDStreamFunctions Implicit") {
-    import io.pivotal.geode.spark.connector.streaming._
-    val mockDStream = mock[DStream[String]]
-    // the implicit make the following line valid
-    val dstream: GeodeDStreamFunctions[String] = mockDStream
-    dstream shouldBe a[GeodeDStreamFunctions[_]]
-  }
-
-  def createMocks[K, V](regionPath: String)
-    (implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]])
-    : (String, GeodeConnectionConf, GeodeConnection, Region[K, V]) = {
-    val mockConnection = mock[GeodeConnection]
-    val mockConnConf = mock[GeodeConnectionConf]
-    val mockRegion = mock[Region[K, V]]
-    when(mockConnConf.getConnection).thenReturn(mockConnection)
-    when(mockConnConf.locators).thenReturn(Seq.empty)
-    (regionPath, mockConnConf, mockConnection, mockRegion)
-  }
-
-  test("test GeodePairDStreamFunctions.saveToGeode()") {
-    import io.pivotal.geode.spark.connector.streaming._
-    val (regionPath, mockConnConf, mockConnection, mockRegion) = 
createMocks[String, String]("test")
-    val mockDStream = mock[DStream[(String, String)]]
-    mockDStream.saveToGeode(regionPath, mockConnConf)
-    verify(mockConnConf).getConnection
-    verify(mockConnection).validateRegion[String, String](regionPath)
-    verify(mockDStream).foreachRDD(mockAny[(RDD[(String, String)]) => Unit])
-  }
-
-  test("test GeodeDStreamFunctions.saveToGeode()") {
-    import io.pivotal.geode.spark.connector.streaming._
-    val (regionPath, mockConnConf, mockConnection, mockRegion) = 
createMocks[String, Int]("test")
-    val mockDStream = mock[DStream[String]]
-    mockDStream.saveToGeode[String, Int](regionPath,  (s: String) => (s, 
s.length), mockConnConf)
-    verify(mockConnConf).getConnection
-    verify(mockConnection).validateRegion[String, String](regionPath)
-    verify(mockDStream).foreachRDD(mockAny[(RDD[String]) => Unit])
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1fc4e6e0/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeRDDFunctionsTest.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeRDDFunctionsTest.scala
 
b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeRDDFunctionsTest.scala
deleted file mode 100644
index 5259198..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeRDDFunctionsTest.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package unittest.io.pivotal.geode.spark.connector
-
-import org.apache.geode.cache.Region
-import io.pivotal.geode.spark.connector._
-import io.pivotal.geode.spark.connector.internal.rdd.{GeodeRDDWriter, 
GeodePairRDDWriter}
-import org.apache.spark.{TaskContext, SparkContext}
-import org.apache.spark.rdd.RDD
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FunSuite, Matchers}
-import collection.JavaConversions._
-import scala.reflect.ClassTag
-import org.mockito.Matchers.{eq => mockEq, any => mockAny}
-
-class GeodeRDDFunctionsTest extends FunSuite with Matchers with MockitoSugar {
-
-  test("test PairRDDFunction Implicit") {
-    import io.pivotal.geode.spark.connector._
-    val mockRDD = mock[RDD[(Int, String)]]
-    // the implicit make the following line valid
-    val pairRDD: GeodePairRDDFunctions[Int, String] = mockRDD
-    pairRDD shouldBe a [GeodePairRDDFunctions[_, _]]
-  }
-  
-  test("test RDDFunction Implicit") {
-    import io.pivotal.geode.spark.connector._
-    val mockRDD = mock[RDD[String]]
-    // the implicit make the following line valid
-    val nonPairRDD: GeodeRDDFunctions[String] = mockRDD
-    nonPairRDD shouldBe a [GeodeRDDFunctions[_]]
-  }
-
-  def createMocks[K, V](regionPath: String)
-    (implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]]): 
(String, GeodeConnectionConf, GeodeConnection, Region[K, V]) = {
-    val mockConnection = mock[GeodeConnection]
-    val mockConnConf = mock[GeodeConnectionConf]
-    val mockRegion = mock[Region[K, V]]
-    when(mockConnConf.getConnection).thenReturn(mockConnection)
-    when(mockConnection.getRegionProxy[K, 
V](regionPath)).thenReturn(mockRegion)
-    // mockRegion shouldEqual mockConn.getRegionProxy[K, V](regionPath)
-    (regionPath, mockConnConf, mockConnection, mockRegion)
-  }
-
-  test("test GeodePairRDDWriter") {
-    val (regionPath, mockConnConf, mockConnection, mockRegion) = 
createMocks[String, String]("test")
-    val writer = new GeodePairRDDWriter[String, String](regionPath, 
mockConnConf)
-    val data = List(("1", "one"), ("2", "two"), ("3", "three"))
-    writer.write(null, data.toIterator)
-    val expectedMap: Map[String, String] = data.toMap
-    verify(mockRegion).putAll(expectedMap)
-  }
-
-  test("test GeodeNonPairRDDWriter") {
-    val (regionPath, mockConnConf, mockConnection, mockRegion) = 
createMocks[Int, String]("test")
-    val writer = new GeodeRDDWriter[String, Int, String](regionPath, 
mockConnConf)
-    val data = List("a", "ab", "abc")
-    val f: String => (Int, String) = s => (s.length, s)
-    writer.write(f)(null, data.toIterator)
-    val expectedMap: Map[Int, String] = data.map(f).toMap
-    verify(mockRegion).putAll(expectedMap)
-  }
-  
-  test("test PairRDDFunctions.saveToGeode") {
-    verifyPairRDDFunction(useOpConf = false)
-  }
-
-  test("test PairRDDFunctions.saveToGeode w/ opConf") {
-    verifyPairRDDFunction(useOpConf = true)
-  }
-  
-  def verifyPairRDDFunction(useOpConf: Boolean): Unit = {
-    import io.pivotal.geode.spark.connector._
-    val (regionPath, mockConnConf, mockConnection, mockRegion) = 
createMocks[String, String]("test")
-    val mockRDD = mock[RDD[(String, String)]]
-    val mockSparkContext = mock[SparkContext]
-    when(mockRDD.sparkContext).thenReturn(mockSparkContext)
-    val result = 
-      if (useOpConf) 
-        mockRDD.saveToGeode(regionPath, mockConnConf, 
Map(RDDSaveBatchSizePropKey -> "5000"))
-      else
-        mockRDD.saveToGeode(regionPath, mockConnConf)
-    verify(mockConnection, times(1)).validateRegion[String, String](regionPath)
-    result === Unit
-    verify(mockSparkContext, times(1)).runJob[(String, String), Unit](
-      mockEq(mockRDD), mockAny[(TaskContext, Iterator[(String, String)]) => 
Unit])(mockAny(classOf[ClassTag[Unit]]))
-
-    // Note: current implementation make following code not compilable
-    //       so not negative test for this case
-    //  val rdd: RDD[(K, V)] = ...
-    //  rdd.saveToGeode(regionPath, s => (s.length, s))
-  }
-
-  test("test RDDFunctions.saveToGeode") {
-    verifyRDDFunction(useOpConf = false)
-  }
-
-  test("test RDDFunctions.saveToGeode w/ opConf") {
-    verifyRDDFunction(useOpConf = true)
-  }
-  
-  def verifyRDDFunction(useOpConf: Boolean): Unit = {
-    import io.pivotal.geode.spark.connector._
-    val (regionPath, mockConnConf, mockConnection, mockRegion) = 
createMocks[Int, String]("test")
-    val mockRDD = mock[RDD[(String)]]
-    val mockSparkContext = mock[SparkContext]
-    when(mockRDD.sparkContext).thenReturn(mockSparkContext)
-    val result = 
-      if (useOpConf)
-        mockRDD.saveToGeode(regionPath, s => (s.length, s), mockConnConf, 
Map(RDDSaveBatchSizePropKey -> "5000"))
-      else
-        mockRDD.saveToGeode(regionPath, s => (s.length, s), mockConnConf)
-    verify(mockConnection, times(1)).validateRegion[Int, String](regionPath)
-    result === Unit
-    verify(mockSparkContext, times(1)).runJob[String, Unit](
-      mockEq(mockRDD), mockAny[(TaskContext, Iterator[String]) => 
Unit])(mockAny(classOf[ClassTag[Unit]]))
-
-    // Note: current implementation make following code not compilable
-    //       so not negative test for this case
-    //  val rdd: RDD[T] = ...   // T is not a (K, V) tuple
-    //  rdd.saveToGeode(regionPath)
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1fc4e6e0/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/LocatorHelperTest.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/LocatorHelperTest.scala
 
b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/LocatorHelperTest.scala
deleted file mode 100644
index c775784..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/LocatorHelperTest.scala
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package unittest.io.pivotal.geode.spark.connector
-
-import java.net.InetAddress
-
-import io.pivotal.geode.spark.connector.internal.LocatorHelper
-import org.scalatest.FunSuite
-
-class LocatorHelperTest extends FunSuite {
-
-  test("locatorStr2HostPortPair hostname w/o domain") {
-    val (host, port) = ("localhost", 10334)
-    assert(LocatorHelper.locatorStr2HostPortPair(s"$host:$port").get ==(host, 
port))
-    assert(LocatorHelper.locatorStr2HostPortPair(s"$host[$port]").get ==(host, 
port))
-  }
-
-  test("locatorStr2HostPortPair hostname w/ domain") {
-    val (host, port) = ("localhost", 10334)
-    assert(LocatorHelper.locatorStr2HostPortPair(s"$host:$port").get ==(host, 
port))
-    assert(LocatorHelper.locatorStr2HostPortPair(s"$host[$port]").get ==(host, 
port))
-  }
-
-  test("locatorStr2HostPortPair w/ invalid host name") {
-    // empty or null locatorStr
-    assert(LocatorHelper.locatorStr2HostPortPair("").isFailure)
-    assert(LocatorHelper.locatorStr2HostPortPair(null).isFailure)
-    // host name has leading `.`
-    assert(LocatorHelper.locatorStr2HostPortPair(".localhost.1234").isFailure)
-    // host name has leading and/or tail white space
-    assert(LocatorHelper.locatorStr2HostPortPair(" localhost.1234").isFailure)
-    assert(LocatorHelper.locatorStr2HostPortPair("localhost .1234").isFailure)
-    assert(LocatorHelper.locatorStr2HostPortPair(" localhost .1234").isFailure)
-    // host name contain invalid characters
-    assert(LocatorHelper.locatorStr2HostPortPair("local%host.1234").isFailure)
-    assert(LocatorHelper.locatorStr2HostPortPair("localhost*.1234").isFailure)
-    assert(LocatorHelper.locatorStr2HostPortPair("^localhost.1234").isFailure)
-  }
-
-  test("locatorStr2HostPortPair w/ valid port") {
-    val host = "192.168.0.1"
-    // port has 2, 3, 4, 5 digits
-    assert(LocatorHelper.locatorStr2HostPortPair(s"$host:20").get ==(host, 20))
-    assert(LocatorHelper.locatorStr2HostPortPair(s"$host:300").get ==(host, 
300))
-    assert(LocatorHelper.locatorStr2HostPortPair(s"$host:4000").get ==(host, 
4000))
-    assert(LocatorHelper.locatorStr2HostPortPair(s"$host:50000").get ==(host, 
50000))
-  }
-  
-  test("locatorStr2HostPortPair w/ invalid port") {
-    // port number is less than 2 digits
-    assert(LocatorHelper.locatorStr2HostPortPair("locslhost.9").isFailure)
-    // port number is more than 5 digits
-    assert(LocatorHelper.locatorStr2HostPortPair("locslhost.100000").isFailure)
-    // port number is invalid
-    assert(LocatorHelper.locatorStr2HostPortPair("locslhost.1xx1").isFailure)
-  }
-  
-  test("parseLocatorsString with valid locator(s)") {
-    val (host1, port1) = ("localhost", 10334)
-    assert(LocatorHelper.parseLocatorsString(s"$host1:$port1") == Seq((host1, 
port1)))
-    val (host2, port2) = ("localhost2", 10335)
-    assert(LocatorHelper.parseLocatorsString(s"$host1:$port1,$host2:$port2") 
== Seq((host1, port1),(host2, port2)))
-    val (host3, port3) = ("localhost2", 10336)
-    
assert(LocatorHelper.parseLocatorsString(s"$host1:$port1,$host2:$port2,$host3:$port3")
 == 
-      Seq((host1, port1),(host2, port2),(host3, port3)))
-  }
-
-  test("parseLocatorsString with invalid locator(s)") {
-    // empty and null locatorsStr
-    intercept[Exception] { LocatorHelper.parseLocatorsString("") }
-    intercept[Exception] { LocatorHelper.parseLocatorsString(null) }
-    // 1 bad locatorStr
-    intercept[Exception] { 
LocatorHelper.parseLocatorsString("local%host.1234") }
-    // 1 good locatorStr and 1 bad locatorStr
-    intercept[Exception] { 
LocatorHelper.parseLocatorsString("localhost:2345,local%host.1234") }
-    intercept[Exception] { 
LocatorHelper.parseLocatorsString("local^host:2345,localhost.1234") }
-  }
-
-  test("pickPreferredGeodeServers: shared servers and one gf-server per host") 
{
-    val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), 
("host3", 4003),("host4", 4004))
-    val servers = Seq(srv1, srv2, srv3, srv4)
-    verifyPickPreferredGeodeServers(servers, "host1", "<driver>", Seq(srv1, 
srv2, srv3))
-    verifyPickPreferredGeodeServers(servers, "host2", "0", Seq(srv2, srv3, 
srv4))
-    verifyPickPreferredGeodeServers(servers, "host3", "1", Seq(srv3, srv4, 
srv1))
-    verifyPickPreferredGeodeServers(servers, "host4", "2", Seq(srv4, srv1, 
srv2))
-  }
-
-  test("pickPreferredGeodeServers: shared servers, one gf-server per host, 
un-sorted list") {
-    val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), 
("host3", 4003),("host4", 4004))
-    val servers = Seq(srv4, srv2, srv3, srv1)
-    verifyPickPreferredGeodeServers(servers, "host1", "<driver>", Seq(srv1, 
srv2, srv3))
-    verifyPickPreferredGeodeServers(servers, "host2", "0", Seq(srv2, srv3, 
srv4))
-    verifyPickPreferredGeodeServers(servers, "host3", "1", Seq(srv3, srv4, 
srv1))
-    verifyPickPreferredGeodeServers(servers, "host4", "2", Seq(srv4, srv1, 
srv2))
-  }
-
-  test("pickPreferredGeodeServers: shared servers and two gf-server per host") 
{
-    val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host1", 4002), 
("host2", 4003), ("host2", 4004))
-    val servers = Seq(srv1, srv2, srv3, srv4)
-    verifyPickPreferredGeodeServers(servers, "host1", "<driver>", Seq(srv1, 
srv2, srv3))
-    verifyPickPreferredGeodeServers(servers, "host1", "0", Seq(srv2, srv1, 
srv3))
-    verifyPickPreferredGeodeServers(servers, "host2", "1", Seq(srv3, srv4, 
srv1))
-    verifyPickPreferredGeodeServers(servers, "host2", "2", Seq(srv4, srv3, 
srv1))
-  }
-
-  test("pickPreferredGeodeServers: shared servers, two gf-server per host, 
un-sorted server list") {
-    val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host1", 4002), 
("host2", 4003), ("host2", 4004))
-    val servers = Seq(srv1, srv4, srv3, srv2)
-    verifyPickPreferredGeodeServers(servers, "host1", "<driver>", Seq(srv1, 
srv2, srv3))
-    verifyPickPreferredGeodeServers(servers, "host1", "0", Seq(srv2, srv1, 
srv3))
-    verifyPickPreferredGeodeServers(servers, "host2", "1", Seq(srv3, srv4, 
srv1))
-    verifyPickPreferredGeodeServers(servers, "host2", "2", Seq(srv4, srv3, 
srv1))
-  }
-
-  test("pickPreferredGeodeServers: no shared servers and one gf-server per 
host") {
-    val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), 
("host3", 4003),("host4", 4004))
-    val servers = Seq(srv1, srv2, srv3, srv4)
-    verifyPickPreferredGeodeServers(servers, "host5", "<driver>", Seq(srv1, 
srv2, srv3))
-    verifyPickPreferredGeodeServers(servers, "host6", "0", Seq(srv2, srv3, 
srv4))
-    verifyPickPreferredGeodeServers(servers, "host7", "1", Seq(srv3, srv4, 
srv1))
-    verifyPickPreferredGeodeServers(servers, "host8", "2", Seq(srv4, srv1, 
srv2))
-  }
-
-  test("pickPreferredGeodeServers: no shared servers, one gf-server per host, 
and less gf-server") {
-    val (srv1, srv2) = (("host1", 4001), ("host2", 4002))
-    val servers = Seq(srv1, srv2)
-    verifyPickPreferredGeodeServers(servers, "host5", "<driver>", Seq(srv1, 
srv2))
-    verifyPickPreferredGeodeServers(servers, "host6", "0", Seq(srv2, srv1))
-    verifyPickPreferredGeodeServers(servers, "host7", "1", Seq(srv1, srv2))
-    verifyPickPreferredGeodeServers(servers, "host8", "2", Seq(srv2, srv1))
-
-
-    println("host name: " + InetAddress.getLocalHost.getHostName)
-    println("canonical host name: " + 
InetAddress.getLocalHost.getCanonicalHostName)
-    println("canonical host name 2: " + 
InetAddress.getByName(InetAddress.getLocalHost.getHostName).getCanonicalHostName)
-  }
-
-  test("pickPreferredGeodeServers: ad-hoc") {
-    val (srv4, srv5, srv6) = (
-      ("w2-gst-pnq-04.gemstone.com", 40411), ("w2-gst-pnq-05.gemstone.com", 
40411), ("w2-gst-pnq-06.gemstone.com", 40411))
-    val servers = Seq(srv6, srv5, srv4)
-    verifyPickPreferredGeodeServers(servers, "w2-gst-pnq-03.gemstone.com", 
"<driver>", Seq(srv4, srv5, srv6))
-    verifyPickPreferredGeodeServers(servers, "w2-gst-pnq-04.gemstone.com", 
"1", Seq(srv4, srv5, srv6))
-    verifyPickPreferredGeodeServers(servers, "w2-gst-pnq-05.gemstone.com", 
"0", Seq(srv5, srv6, srv4))
-    verifyPickPreferredGeodeServers(servers, "w2-gst-pnq-06.gemstone.com", 
"2", Seq(srv6, srv4, srv5))
-  }
-  
-  def verifyPickPreferredGeodeServers(
-    servers: Seq[(String, Int)], hostName: String, executorId: String, 
expectation: Seq[(String, Int)]): Unit = {
-    val result = LocatorHelper.pickPreferredGeodeServers(servers, hostName, 
executorId)
-    assert(result == expectation, s"pick servers for $hostName:$executorId")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1fc4e6e0/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRDDPartitionerTest.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRDDPartitionerTest.scala
 
b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRDDPartitionerTest.scala
deleted file mode 100644
index 2f92c1d..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRDDPartitionerTest.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package unittest.io.pivotal.geode.spark.connector.rdd
-
-import org.apache.geode.distributed.internal.ServerLocation
-import io.pivotal.geode.spark.connector.internal.RegionMetadata
-import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartitioner._
-import io.pivotal.geode.spark.connector.GeodeConnection
-import io.pivotal.geode.spark.connector.internal.rdd._
-import org.apache.spark.Partition
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, FunSuite}
-
-import java.util.{HashSet => JHashSet, HashMap => JHashMap}
-
-import scala.collection.mutable
-
-class GeodeRDDPartitionerTest extends FunSuite with Matchers with MockitoSugar 
{
-
-  val emptyServerBucketMap: JHashMap[ServerLocation, JHashSet[Integer]] = new 
JHashMap()
-
-  def toJavaServerBucketMap(map: Map[(String, Int), Set[Int]]): 
JHashMap[ServerLocation, JHashSet[Integer]] = {
-    import scala.collection.JavaConversions._
-    val tmp = map.map {case ((host, port), set) => (new ServerLocation(host, 
port), set.map(Integer.valueOf))}
-    (new JHashMap[ServerLocation, JHashSet[Integer]]() /: tmp) { case (acc, 
(s, jset)) => acc.put(s, new JHashSet(jset)); acc }
-  }
-  
-  val map: mutable.Map[(String, Int), mutable.Set[Int]] = mutable.Map(
-    ("s0",1) -> mutable.Set.empty, ("s1",2) -> mutable.Set(0), ("s2",3) -> 
mutable.Set(1, 2), ("s3",4) -> mutable.Set(3, 4, 5))
-
-  
-  // update this test whenever change default setting 
-  test("default partitioned region partitioner") {
-    assert(GeodeRDDPartitioner.defaultPartitionedRegionPartitioner === 
ServerSplitsPartitioner)
-  }
-
-  // update this test whenever change default setting 
-  test("default replicated region partitioner") {
-    assert(GeodeRDDPartitioner.defaultReplicatedRegionPartitioner === 
OnePartitionPartitioner)
-  }
-  
-  test("GeodeRDDPartitioner.apply method") {
-    import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartitioner._
-    for ((name, partitioner) <- partitioners) assert(GeodeRDDPartitioner(name) 
== partitioner)
-    assert(GeodeRDDPartitioner("dummy") == 
GeodeRDDPartitioner.defaultPartitionedRegionPartitioner)
-    assert(GeodeRDDPartitioner() == 
GeodeRDDPartitioner.defaultPartitionedRegionPartitioner)
-  }
-  
-  test("OnePartitionPartitioner") {
-    val mockConnection = mock[GeodeConnection]
-    val partitions = OnePartitionPartitioner.partitions[String, 
String](mockConnection, null, Map.empty)
-    verifySinglePartition(partitions)
-  }
-
-  def verifySinglePartition(partitions: Array[Partition]): Unit = {
-    assert(1 == partitions.size)
-    assert(partitions(0).index === 0)
-    assert(partitions(0).isInstanceOf[GeodeRDDPartition])
-    assert(partitions(0).asInstanceOf[GeodeRDDPartition].bucketSet.isEmpty)
-  }
-
-  test("ServerSplitsPartitioner.doPartitions(): n=1 & no empty bucket") {
-    val map: List[(String, mutable.Set[Int])] = List(
-      "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4))
-    val partitions = ServerSplitsPartitioner.doPartitions(map, 6, 1)
-    verifyPartitions(partitions, List(
-      (Set(0, 1, 2, 3), Seq("server1")), (Set(4, 5), Seq("server2"))))
-  }
-
-  test("ServerSplitsPartitioner.doPartitions(): n=1 & 1 empty bucket") {
-    val map: List[(String, mutable.Set[Int])] = List(
-      "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4))
-    val partitions = ServerSplitsPartitioner.doPartitions(map, 7, 1)
-    verifyPartitions(partitions, List(
-      (Set(0, 1, 2, 3, 6), Seq("server1")), (Set(4, 5), Seq("server2"))))
-  }
-
-  test("ServerSplitsPartitioner.doPartitions(): n=1 & 2 empty bucket") {
-    val map: List[(String, mutable.Set[Int])] = List(
-      "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4))
-    val partitions = ServerSplitsPartitioner.doPartitions(map, 8, 1)
-    verifyPartitions(partitions, List(
-      (Set(0, 1, 2, 3, 6), Seq("server1")), (Set(4, 5, 7), Seq("server2"))))
-  }
-
-  test("ServerSplitsPartitioner.doPartitions(): n=1 & 5 empty bucket") {
-    val map: List[(String, mutable.Set[Int])] = List(
-      "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4))
-    val partitions = ServerSplitsPartitioner.doPartitions(map, 11, 1)
-    verifyPartitions(partitions, List(
-      (Set(0, 1, 2, 3, 6, 7, 8), Seq("server1")), (Set(4, 5, 9, 10), 
Seq("server2"))))
-  }
-
-  test("ServerSplitsPartitioner.doPartitions(): n=1, 4 empty-bucket, 
non-continuous IDs") {
-    val map: List[(String, mutable.Set[Int])] = List(
-      "server1" -> mutable.Set(1, 3), "server2" -> mutable.Set(5,7))
-    val partitions = ServerSplitsPartitioner.doPartitions(map, 8, 1)
-    verifyPartitions(partitions, List(
-      (Set(0, 1, 2, 3), Seq("server1")), (Set(4, 5, 6, 7), Seq("server2"))))
-  }
-
-  test("ServerSplitsPartitioner.doPartitions(): n=2, no empty buckets, 3 
servers have 1, 2, and 3 buckets") {
-    val map: List[(String, mutable.Set[Int])] = List(
-      "s1" -> mutable.Set(0), "s2" -> mutable.Set(1, 2), "s3" -> 
mutable.Set(3, 4, 5))
-    val partitions = ServerSplitsPartitioner.doPartitions(map, 6, 2)
-    // partitions.foreach(println)
-    verifyPartitions(partitions, List(
-      (Set(0), Seq("s1")), (Set(1), Seq("s2")), (Set(2), Seq("s2")), (Set(3, 
4), Seq("s3")), (Set(5), Seq("s3"))))
-  }
-
-  test("ServerSplitsPartitioner.doPartitions(): n=3, no empty buckets, 4 
servers have 0, 2, 3, and 4 buckets") {
-    val map: List[(String, mutable.Set[Int])] = List(
-      "s0" -> mutable.Set.empty, "s1" -> mutable.Set(0, 1), "s2" -> 
mutable.Set(2, 3, 4), "s3" -> mutable.Set(5, 6, 7, 8))
-    val partitions = ServerSplitsPartitioner.doPartitions(map, 9, 3)
-    // partitions.foreach(println)
-    verifyPartitions(partitions, List(
-      (Set(0), Seq("s1")), (Set(1), Seq("s1")), (Set(2), Seq("s2")), (Set(3), 
Seq("s2")), (Set(4), Seq("s2")),
-      (Set(5, 6), Seq("s3")), (Set(7, 8), Seq("s3")) ))
-  }
-
-  test("ServerSplitsPartitioner.partitions(): metadata = None ") {
-    val regionPath = "test"
-    val mockConnection = mock[GeodeConnection]
-    intercept[RuntimeException] { ServerSplitsPartitioner.partitions[String, 
String](mockConnection, null, Map.empty) }
-  }
-
-  test("ServerSplitsPartitioner.partitions(): replicated region ") {
-    val regionPath = "test"
-    val mockConnection = mock[GeodeConnection]
-    val md = new RegionMetadata(regionPath, false, 11, null)
-    when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md))
-    val partitions = ServerSplitsPartitioner.partitions[String, 
String](mockConnection, md, Map.empty)
-    verifySinglePartition(partitions)
-  }
-
-  test("ServerSplitsPartitioner.partitions(): partitioned region w/o data ") {
-    val regionPath = "test"
-    val mockConnection = mock[GeodeConnection]
-    val md = new RegionMetadata(regionPath, true, 6, emptyServerBucketMap)
-    when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md))
-    val partitions = ServerSplitsPartitioner.partitions[String, 
String](mockConnection, md, Map.empty)
-    verifySinglePartition(partitions)
-  }
-
-  test("ServerSplitsPartitioner.partitions(): partitioned region w/ some data 
") {
-    import io.pivotal.geode.spark.connector.NumberPartitionsPerServerPropKey
-    val regionPath = "test"
-    val mockConnection = mock[GeodeConnection]
-    val map: Map[(String, Int), Set[Int]] = Map(
-      ("s0",1) -> Set.empty, ("s1",2) -> Set(0), ("s2",3) -> Set(1, 2), 
("s3",4) -> Set(3, 4, 5))
-    val md = new RegionMetadata(regionPath, true, 6, 
toJavaServerBucketMap(map))
-    when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md))
-    val partitions = ServerSplitsPartitioner.partitions[String, 
String](mockConnection, md, Map(NumberPartitionsPerServerPropKey->"2"))
-    // partitions.foreach(println)
-    verifyPartitions(partitions, List(
-      (Set(0), Seq("s1")), (Set(1), Seq("s2")), (Set(2), Seq("s2")), (Set(3, 
4), Seq("s3")), (Set(5), Seq("s3"))))
-  }
-  
-  // Note: since the order of partitions is not pre-determined, we have to 
verify partition id
-  // and contents separately
-  def verifyPartitions(partitions: Array[Partition], expPartitions: 
List[(Set[Int], Seq[String])]): Unit = {
-    // 1. check size
-    assert(partitions.size == expPartitions.size)
-    // 2. check IDs are 0 to n-1
-    (0 until partitions.size).toList.zip(partitions).foreach { case (id, p) => 
assert(id == p.index) }
-
-    // 3. get all pairs of bucket set and its locations, and compare to the 
expected pairs
-    val list = partitions.map { e =>
-      val p = e.asInstanceOf[GeodeRDDPartition]
-      (p.bucketSet, p.locations)
-    }
-    expPartitions.foreach(e => assert(list.contains(e)))    
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1fc4e6e0/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRegionRDDTest.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRegionRDDTest.scala
 
b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRegionRDDTest.scala
deleted file mode 100644
index 63eddf9..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRegionRDDTest.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package unittest.io.pivotal.geode.spark.connector.rdd
-
-import org.apache.geode.cache.Region
-import io.pivotal.geode.spark.connector.internal.RegionMetadata
-import io.pivotal.geode.spark.connector.internal.rdd.{GeodeRDDPartition, 
GeodeRegionRDD}
-import io.pivotal.geode.spark.connector.{GeodeConnectionConf, GeodeConnection}
-import org.apache.spark.{TaskContext, Partition, SparkContext}
-import org.mockito.Mockito._
-import org.mockito.Matchers.{eq => mockEq, any => mockAny}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, FunSuite}
-
-import scala.reflect.ClassTag
-
-class GeodeRegionRDDTest extends FunSuite with Matchers with MockitoSugar {
-
-  /** create common mocks, not all mocks are used by all tests */
-  def createMocks[K, V](regionPath: String)(implicit kt: ClassTag[K], vt: 
ClassTag[V], m: Manifest[Region[K, V]])
-    : (String, Region[K,V], GeodeConnectionConf, GeodeConnection) = {
-    val mockConnection = mock[GeodeConnection]
-    val mockRegion = mock[Region[K, V]]
-    val mockConnConf = mock[GeodeConnectionConf]
-    when(mockConnConf.getConnection).thenReturn(mockConnection)
-    when(mockConnection.getRegionProxy[K, 
V](regionPath)).thenReturn(mockRegion)
-    when(mockConnConf.locators).thenReturn(Seq.empty)
-    (regionPath, mockRegion, mockConnConf, mockConnection)
-  }
-  
-  test("create GeodeRDD with non-existing region") {
-    val (regionPath, mockRegion, mockConnConf, mockConnection) = 
createMocks[String, String]("test")
-    when(mockConnConf.getConnection).thenReturn(mockConnection)
-    
when(mockConnection.validateRegion[String,String](regionPath)).thenThrow(new 
RuntimeException)
-    val mockSparkContext = mock[SparkContext]
-    intercept[RuntimeException] { GeodeRegionRDD[String, 
String](mockSparkContext, regionPath, mockConnConf) }
-    verify(mockConnConf).getConnection
-    verify(mockConnection).validateRegion[String, String](regionPath)
-  }
-  
-  test("getPartitions with non-existing region") {
-    // region exists when RDD is created, but get removed before 
getPartitions() is invoked
-    val (regionPath, mockRegion, mockConnConf, mockConnection) = 
createMocks[String, String]("test")
-    when(mockConnection.getRegionMetadata[String, 
String](regionPath)).thenReturn(None)
-    val mockSparkContext = mock[SparkContext]
-    intercept[RuntimeException] { GeodeRegionRDD[String, 
String](mockSparkContext, regionPath, mockConnConf).getPartitions }
-  }
-
-  test("getPartitions with replicated region and not preferred env") {
-    val (regionPath, mockRegion, mockConnConf, mockConnection) = 
createMocks[String, String]("test")
-    implicit val mockConnConf2 = mockConnConf
-    val mockSparkContext = mock[SparkContext]
-    when(mockConnection.getRegionMetadata[String, 
String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, false, 0, 
null)))
-    val partitions = GeodeRegionRDD(mockSparkContext, regionPath, 
mockConnConf).partitions
-    verifySinglePartition(partitions)
-  }
-
-  def verifySinglePartition(partitions: Array[Partition]): Unit = {
-    assert(1 == partitions.size)
-    assert(partitions(0).index === 0)
-    assert(partitions(0).isInstanceOf[GeodeRDDPartition])
-    assert(partitions(0).asInstanceOf[GeodeRDDPartition].bucketSet.isEmpty)
-  }
-
-  test("getPartitions with replicated region and preferred 
OnePartitionPartitioner") {
-    // since it's replicated region, so OnePartitionPartitioner will be used, 
i.e., override preferred partitioner
-    import io.pivotal.geode.spark.connector.{PreferredPartitionerPropKey, 
OnePartitionPartitionerName}
-    val (regionPath, mockRegion, mockConnConf, mockConnection) = 
createMocks[String, String]("test")
-    when(mockConnection.getRegionMetadata[String, 
String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, false, 0, 
null)))
-    implicit val mockConnConf2 = mockConnConf
-    val mockSparkContext = mock[SparkContext]
-    val env = Map(PreferredPartitionerPropKey -> OnePartitionPartitionerName)
-    val partitions = GeodeRegionRDD(mockSparkContext, regionPath, 
mockConnConf, env).partitions
-    verifySinglePartition(partitions)
-  }
-
-  test("getPartitions with partitioned region and not preferred env") {
-    val (regionPath, mockRegion, mockConnConf, mockConnection) = 
createMocks[String, String]("test")
-    implicit val mockConnConf2 = mockConnConf
-    val mockSparkContext = mock[SparkContext]
-    when(mockConnection.getRegionMetadata[String, 
String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, true, 2, 
null)))
-    val partitions = GeodeRegionRDD(mockSparkContext, regionPath, 
mockConnConf).partitions
-    verifySinglePartition(partitions)
-  }
-
-  test("GeodeRDD.compute() method") {
-    val (regionPath, mockRegion, mockConnConf, mockConnection) = 
createMocks[String, String]("test")
-    implicit val mockConnConf2 = mockConnConf
-    val mockIter = mock[Iterator[(String, String)]]
-    val partition = GeodeRDDPartition(0, Set.empty)
-    when(mockConnection.getRegionMetadata[String, 
String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, true, 2, 
null)))
-    when(mockConnection.getRegionData[String, String](regionPath, None, 
partition)).thenReturn(mockIter)
-    val mockSparkContext = mock[SparkContext]
-    val rdd = GeodeRegionRDD[String, String](mockSparkContext, regionPath, 
mockConnConf)
-    val partitions = rdd.partitions
-    assert(1 == partitions.size)
-    val mockTaskContext = mock[TaskContext]
-    rdd.compute(partitions(0), mockTaskContext)        
-    verify(mockConnection).getRegionData[String, String](mockEq(regionPath), 
mockEq(None), mockEq(partition))
-    // verify(mockConnection).getRegionData[String, String](regionPath, 
Set.empty.asInstanceOf[Set[Int]], "geodeRDD 0.0")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1fc4e6e0/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/ConnectorImplicitsTest.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/ConnectorImplicitsTest.scala
 
b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/ConnectorImplicitsTest.scala
new file mode 100644
index 0000000..b0464cc
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/ConnectorImplicitsTest.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unittest.io.pivotal.geode.spark.connector
+
+import io.pivotal.geode.spark.connector._
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.scalatest.FunSuite
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.Matchers
+
+class ConnectorImplicitsTest extends FunSuite with Matchers with MockitoSugar {
+
+  test("implicit map2Properties") {
+    verifyProperties(Map.empty)
+    verifyProperties(Map("One" -> "1", "Two" -> "2", "Three" ->"3"))
+  }
+  
+  def verifyProperties(map: Map[String, String]): Unit = {
+    val props: java.util.Properties = map
+    assert(props.size() == map.size)
+    map.foreach(p => assert(props.getProperty(p._1) == p._2))    
+  }
+
+  test("Test Implicit SparkContext Conversion") {
+    val mockSparkContext = mock[SparkContext]
+    val gfscf: GeodeSparkContextFunctions = mockSparkContext
+    assert(gfscf.isInstanceOf[GeodeSparkContextFunctions])
+  }
+
+  test("Test Implicit SQLContext Conversion") {
+    val mockSQLContext = mock[SQLContext]
+    val gfscf: GeodeSQLContextFunctions = mockSQLContext
+    assert(gfscf.isInstanceOf[GeodeSQLContextFunctions])
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1fc4e6e0/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/GeodeConnectionConfTest.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/GeodeConnectionConfTest.scala
 
b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/GeodeConnectionConfTest.scala
new file mode 100644
index 0000000..a3076f4
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/GeodeConnectionConfTest.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unittest.io.pivotal.geode.spark.connector
+
+import org.apache.spark.SparkConf
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, FunSuite}
+import io.pivotal.geode.spark.connector._
+
+class GeodeConnectionConfTest extends FunSuite with Matchers with MockitoSugar 
{
+
+  test("apply(SparkConf) w/ GeodeLocator property and empty geodeProps") {
+    val (host1, port1) = ("host1", 1234)
+    val (host2, port2) = ("host2", 5678)
+    val conf = new SparkConf().set(GeodeLocatorPropKey, 
s"$host1[$port1],$host2[$port2]")
+    val connConf = GeodeConnectionConf(conf)
+    assert(connConf.locators == Seq((host1, port1),(host2, port2)))
+    assert(connConf.geodeProps.isEmpty)
+  }
+  
+  test("apply(SparkConf) w/ GeodeLocator property and geode properties") {
+    val (host1, port1) = ("host1", 1234)
+    val (host2, port2) = ("host2", 5678)
+    val (propK1, propV1) = ("ack-severe-alert-threshold", "1")
+    val (propK2, propV2) = ("ack-wait-threshold", "10")
+    val conf = new SparkConf().set(GeodeLocatorPropKey, 
s"$host1[$port1],$host2[$port2]")
+                              .set(s"spark.geode.$propK1", 
propV1).set(s"spark.geode.$propK2", propV2)
+    val connConf = GeodeConnectionConf(conf)
+    assert(connConf.locators == Seq((host1, port1),(host2, port2)))
+    assert(connConf.geodeProps == Map(propK1 -> propV1, propK2 -> propV2))
+  }
+
+  test("apply(SparkConf) w/o GeodeLocator property") {
+    intercept[RuntimeException] { GeodeConnectionConf(new SparkConf()) }
+  }
+
+  test("apply(SparkConf) w/ invalid GeodeLocator property") {
+    val conf = new SparkConf().set(GeodeLocatorPropKey, "local^host:1234")
+    intercept[Exception] { GeodeConnectionConf(conf) }
+  }
+
+  test("apply(locatorStr, geodeProps) w/ valid locatorStr and non geodeProps") 
{
+    val (host1, port1) = ("host1", 1234)
+    val connConf = GeodeConnectionConf(s"$host1:$port1")
+    assert(connConf.locators == Seq((host1, port1)))
+    assert(connConf.geodeProps.isEmpty)
+  }
+
+  test("apply(locatorStr, geodeProps) w/ valid locatorStr and non-empty 
geodeProps") {
+    val (host1, port1) = ("host1", 1234)
+    val (host2, port2) = ("host2", 5678)
+    val (propK1, propV1) = ("ack-severe-alert-threshold", "1")
+    val (propK2, propV2) = ("ack-wait-threshold", "10")
+    val props = Map(propK1 -> propV1, propK2 -> propV2)
+    val connConf = GeodeConnectionConf(s"$host1:$port1,$host2:$port2", props)
+    assert(connConf.locators == Seq((host1, port1),(host2, port2)))
+    assert(connConf.geodeProps == props)
+  }
+
+  test("apply(locatorStr, geodeProps) w/ invalid locatorStr") {
+    intercept[Exception] { GeodeConnectionConf("local~host:4321") }
+  }
+
+  test("constructor w/ empty (host,port) pairs") {
+    intercept[IllegalArgumentException] { new GeodeConnectionConf(Seq.empty) }
+  }
+
+  test("getConnection() normal") {
+    implicit val mockFactory = mock[GeodeConnectionManager]
+    val mockConnection = mock[GeodeConnection]
+    
when(mockFactory.getConnection(org.mockito.Matchers.any[GeodeConnectionConf])).thenReturn(mockConnection)
+    val connConf = GeodeConnectionConf("localhost:1234")
+    assert(connConf.getConnection == mockConnection)
+    verify(mockFactory).getConnection(connConf)
+  }
+
+  test("getConnection() failure") {
+    implicit val mockFactory = mock[GeodeConnectionManager]
+    
when(mockFactory.getConnection(org.mockito.Matchers.any[GeodeConnectionConf])).thenThrow(new
 RuntimeException)
+    val connConf = GeodeConnectionConf("localhost:1234")
+    intercept[RuntimeException] { connConf.getConnection }
+    verify(mockFactory).getConnection(connConf)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1fc4e6e0/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/GeodeDStreamFunctionsTest.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/GeodeDStreamFunctionsTest.scala
 
b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/GeodeDStreamFunctionsTest.scala
new file mode 100644
index 0000000..d671722
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/GeodeDStreamFunctionsTest.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unittest.io.pivotal.geode.spark.connector
+
+import org.apache.geode.cache.Region
+import io.pivotal.geode.spark.connector.{GeodeConnection, GeodeConnectionConf}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream.DStream
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, FunSuite}
+import org.mockito.Matchers.{eq => mockEq, any => mockAny}
+
+import scala.reflect.ClassTag
+
+class GeodeDStreamFunctionsTest extends FunSuite with Matchers with 
MockitoSugar {
+
+  test("test GeodePairDStreamFunctions Implicit") {
+    import io.pivotal.geode.spark.connector.streaming._
+    val mockDStream = mock[DStream[(Int, String)]]
+    // the implicit make the following line valid
+    val pairDStream: GeodePairDStreamFunctions[Int, String] = mockDStream
+    pairDStream shouldBe a[GeodePairDStreamFunctions[_, _]]
+  }
+
+  test("test GeodeDStreamFunctions Implicit") {
+    import io.pivotal.geode.spark.connector.streaming._
+    val mockDStream = mock[DStream[String]]
+    // the implicit make the following line valid
+    val dstream: GeodeDStreamFunctions[String] = mockDStream
+    dstream shouldBe a[GeodeDStreamFunctions[_]]
+  }
+
+  def createMocks[K, V](regionPath: String)
+    (implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]])
+    : (String, GeodeConnectionConf, GeodeConnection, Region[K, V]) = {
+    val mockConnection = mock[GeodeConnection]
+    val mockConnConf = mock[GeodeConnectionConf]
+    val mockRegion = mock[Region[K, V]]
+    when(mockConnConf.getConnection).thenReturn(mockConnection)
+    when(mockConnConf.locators).thenReturn(Seq.empty)
+    (regionPath, mockConnConf, mockConnection, mockRegion)
+  }
+
+  test("test GeodePairDStreamFunctions.saveToGeode()") {
+    import io.pivotal.geode.spark.connector.streaming._
+    val (regionPath, mockConnConf, mockConnection, mockRegion) = 
createMocks[String, String]("test")
+    val mockDStream = mock[DStream[(String, String)]]
+    mockDStream.saveToGeode(regionPath, mockConnConf)
+    verify(mockConnConf).getConnection
+    verify(mockConnection).validateRegion[String, String](regionPath)
+    verify(mockDStream).foreachRDD(mockAny[(RDD[(String, String)]) => Unit])
+  }
+
+  test("test GeodeDStreamFunctions.saveToGeode()") {
+    import io.pivotal.geode.spark.connector.streaming._
+    val (regionPath, mockConnConf, mockConnection, mockRegion) = 
createMocks[String, Int]("test")
+    val mockDStream = mock[DStream[String]]
+    mockDStream.saveToGeode[String, Int](regionPath,  (s: String) => (s, 
s.length), mockConnConf)
+    verify(mockConnConf).getConnection
+    verify(mockConnection).validateRegion[String, String](regionPath)
+    verify(mockDStream).foreachRDD(mockAny[(RDD[String]) => Unit])
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1fc4e6e0/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/GeodeRDDFunctionsTest.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/GeodeRDDFunctionsTest.scala
 
b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/GeodeRDDFunctionsTest.scala
new file mode 100644
index 0000000..5259198
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/GeodeRDDFunctionsTest.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unittest.io.pivotal.geode.spark.connector
+
+import org.apache.geode.cache.Region
+import io.pivotal.geode.spark.connector._
+import io.pivotal.geode.spark.connector.internal.rdd.{GeodeRDDWriter, 
GeodePairRDDWriter}
+import org.apache.spark.{TaskContext, SparkContext}
+import org.apache.spark.rdd.RDD
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FunSuite, Matchers}
+import collection.JavaConversions._
+import scala.reflect.ClassTag
+import org.mockito.Matchers.{eq => mockEq, any => mockAny}
+
+class GeodeRDDFunctionsTest extends FunSuite with Matchers with MockitoSugar {
+
+  test("test PairRDDFunction Implicit") {
+    import io.pivotal.geode.spark.connector._
+    val mockRDD = mock[RDD[(Int, String)]]
+    // the implicit make the following line valid
+    val pairRDD: GeodePairRDDFunctions[Int, String] = mockRDD
+    pairRDD shouldBe a [GeodePairRDDFunctions[_, _]]
+  }
+  
+  test("test RDDFunction Implicit") {
+    import io.pivotal.geode.spark.connector._
+    val mockRDD = mock[RDD[String]]
+    // the implicit make the following line valid
+    val nonPairRDD: GeodeRDDFunctions[String] = mockRDD
+    nonPairRDD shouldBe a [GeodeRDDFunctions[_]]
+  }
+
+  def createMocks[K, V](regionPath: String)
+    (implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]]): 
(String, GeodeConnectionConf, GeodeConnection, Region[K, V]) = {
+    val mockConnection = mock[GeodeConnection]
+    val mockConnConf = mock[GeodeConnectionConf]
+    val mockRegion = mock[Region[K, V]]
+    when(mockConnConf.getConnection).thenReturn(mockConnection)
+    when(mockConnection.getRegionProxy[K, 
V](regionPath)).thenReturn(mockRegion)
+    // mockRegion shouldEqual mockConn.getRegionProxy[K, V](regionPath)
+    (regionPath, mockConnConf, mockConnection, mockRegion)
+  }
+
+  test("test GeodePairRDDWriter") {
+    val (regionPath, mockConnConf, mockConnection, mockRegion) = 
createMocks[String, String]("test")
+    val writer = new GeodePairRDDWriter[String, String](regionPath, 
mockConnConf)
+    val data = List(("1", "one"), ("2", "two"), ("3", "three"))
+    writer.write(null, data.toIterator)
+    val expectedMap: Map[String, String] = data.toMap
+    verify(mockRegion).putAll(expectedMap)
+  }
+
+  test("test GeodeNonPairRDDWriter") {
+    val (regionPath, mockConnConf, mockConnection, mockRegion) = 
createMocks[Int, String]("test")
+    val writer = new GeodeRDDWriter[String, Int, String](regionPath, 
mockConnConf)
+    val data = List("a", "ab", "abc")
+    val f: String => (Int, String) = s => (s.length, s)
+    writer.write(f)(null, data.toIterator)
+    val expectedMap: Map[Int, String] = data.map(f).toMap
+    verify(mockRegion).putAll(expectedMap)
+  }
+  
+  test("test PairRDDFunctions.saveToGeode") {
+    verifyPairRDDFunction(useOpConf = false)
+  }
+
+  test("test PairRDDFunctions.saveToGeode w/ opConf") {
+    verifyPairRDDFunction(useOpConf = true)
+  }
+  
+  def verifyPairRDDFunction(useOpConf: Boolean): Unit = {
+    import io.pivotal.geode.spark.connector._
+    val (regionPath, mockConnConf, mockConnection, mockRegion) = 
createMocks[String, String]("test")
+    val mockRDD = mock[RDD[(String, String)]]
+    val mockSparkContext = mock[SparkContext]
+    when(mockRDD.sparkContext).thenReturn(mockSparkContext)
+    val result = 
+      if (useOpConf) 
+        mockRDD.saveToGeode(regionPath, mockConnConf, 
Map(RDDSaveBatchSizePropKey -> "5000"))
+      else
+        mockRDD.saveToGeode(regionPath, mockConnConf)
+    verify(mockConnection, times(1)).validateRegion[String, String](regionPath)
+    result === Unit
+    verify(mockSparkContext, times(1)).runJob[(String, String), Unit](
+      mockEq(mockRDD), mockAny[(TaskContext, Iterator[(String, String)]) => 
Unit])(mockAny(classOf[ClassTag[Unit]]))
+
+    // Note: current implementation make following code not compilable
+    //       so not negative test for this case
+    //  val rdd: RDD[(K, V)] = ...
+    //  rdd.saveToGeode(regionPath, s => (s.length, s))
+  }
+
+  test("test RDDFunctions.saveToGeode") {
+    verifyRDDFunction(useOpConf = false)
+  }
+
+  test("test RDDFunctions.saveToGeode w/ opConf") {
+    verifyRDDFunction(useOpConf = true)
+  }
+  
+  def verifyRDDFunction(useOpConf: Boolean): Unit = {
+    import io.pivotal.geode.spark.connector._
+    val (regionPath, mockConnConf, mockConnection, mockRegion) = 
createMocks[Int, String]("test")
+    val mockRDD = mock[RDD[(String)]]
+    val mockSparkContext = mock[SparkContext]
+    when(mockRDD.sparkContext).thenReturn(mockSparkContext)
+    val result = 
+      if (useOpConf)
+        mockRDD.saveToGeode(regionPath, s => (s.length, s), mockConnConf, 
Map(RDDSaveBatchSizePropKey -> "5000"))
+      else
+        mockRDD.saveToGeode(regionPath, s => (s.length, s), mockConnConf)
+    verify(mockConnection, times(1)).validateRegion[Int, String](regionPath)
+    result === Unit
+    verify(mockSparkContext, times(1)).runJob[String, Unit](
+      mockEq(mockRDD), mockAny[(TaskContext, Iterator[String]) => 
Unit])(mockAny(classOf[ClassTag[Unit]]))
+
+    // Note: current implementation make following code not compilable
+    //       so not negative test for this case
+    //  val rdd: RDD[T] = ...   // T is not a (K, V) tuple
+    //  rdd.saveToGeode(regionPath)
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1fc4e6e0/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/LocatorHelperTest.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/LocatorHelperTest.scala
 
b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/LocatorHelperTest.scala
new file mode 100644
index 0000000..c775784
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/LocatorHelperTest.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unittest.io.pivotal.geode.spark.connector
+
+import java.net.InetAddress
+
+import io.pivotal.geode.spark.connector.internal.LocatorHelper
+import org.scalatest.FunSuite
+
+class LocatorHelperTest extends FunSuite {
+
+  test("locatorStr2HostPortPair hostname w/o domain") {
+    val (host, port) = ("localhost", 10334)
+    assert(LocatorHelper.locatorStr2HostPortPair(s"$host:$port").get ==(host, 
port))
+    assert(LocatorHelper.locatorStr2HostPortPair(s"$host[$port]").get ==(host, 
port))
+  }
+
+  test("locatorStr2HostPortPair hostname w/ domain") {
+    val (host, port) = ("localhost", 10334)
+    assert(LocatorHelper.locatorStr2HostPortPair(s"$host:$port").get ==(host, 
port))
+    assert(LocatorHelper.locatorStr2HostPortPair(s"$host[$port]").get ==(host, 
port))
+  }
+
+  test("locatorStr2HostPortPair w/ invalid host name") {
+    // empty or null locatorStr
+    assert(LocatorHelper.locatorStr2HostPortPair("").isFailure)
+    assert(LocatorHelper.locatorStr2HostPortPair(null).isFailure)
+    // host name has leading `.`
+    assert(LocatorHelper.locatorStr2HostPortPair(".localhost.1234").isFailure)
+    // host name has leading and/or tail white space
+    assert(LocatorHelper.locatorStr2HostPortPair(" localhost.1234").isFailure)
+    assert(LocatorHelper.locatorStr2HostPortPair("localhost .1234").isFailure)
+    assert(LocatorHelper.locatorStr2HostPortPair(" localhost .1234").isFailure)
+    // host name contain invalid characters
+    assert(LocatorHelper.locatorStr2HostPortPair("local%host.1234").isFailure)
+    assert(LocatorHelper.locatorStr2HostPortPair("localhost*.1234").isFailure)
+    assert(LocatorHelper.locatorStr2HostPortPair("^localhost.1234").isFailure)
+  }
+
+  test("locatorStr2HostPortPair w/ valid port") {
+    val host = "192.168.0.1"
+    // port has 2, 3, 4, 5 digits
+    assert(LocatorHelper.locatorStr2HostPortPair(s"$host:20").get ==(host, 20))
+    assert(LocatorHelper.locatorStr2HostPortPair(s"$host:300").get ==(host, 
300))
+    assert(LocatorHelper.locatorStr2HostPortPair(s"$host:4000").get ==(host, 
4000))
+    assert(LocatorHelper.locatorStr2HostPortPair(s"$host:50000").get ==(host, 
50000))
+  }
+  
+  test("locatorStr2HostPortPair w/ invalid port") {
+    // port number is less than 2 digits
+    assert(LocatorHelper.locatorStr2HostPortPair("locslhost.9").isFailure)
+    // port number is more than 5 digits
+    assert(LocatorHelper.locatorStr2HostPortPair("locslhost.100000").isFailure)
+    // port number is invalid
+    assert(LocatorHelper.locatorStr2HostPortPair("locslhost.1xx1").isFailure)
+  }
+  
+  test("parseLocatorsString with valid locator(s)") {
+    val (host1, port1) = ("localhost", 10334)
+    assert(LocatorHelper.parseLocatorsString(s"$host1:$port1") == Seq((host1, 
port1)))
+    val (host2, port2) = ("localhost2", 10335)
+    assert(LocatorHelper.parseLocatorsString(s"$host1:$port1,$host2:$port2") 
== Seq((host1, port1),(host2, port2)))
+    val (host3, port3) = ("localhost2", 10336)
+    
assert(LocatorHelper.parseLocatorsString(s"$host1:$port1,$host2:$port2,$host3:$port3")
 == 
+      Seq((host1, port1),(host2, port2),(host3, port3)))
+  }
+
+  test("parseLocatorsString with invalid locator(s)") {
+    // empty and null locatorsStr
+    intercept[Exception] { LocatorHelper.parseLocatorsString("") }
+    intercept[Exception] { LocatorHelper.parseLocatorsString(null) }
+    // 1 bad locatorStr
+    intercept[Exception] { 
LocatorHelper.parseLocatorsString("local%host.1234") }
+    // 1 good locatorStr and 1 bad locatorStr
+    intercept[Exception] { 
LocatorHelper.parseLocatorsString("localhost:2345,local%host.1234") }
+    intercept[Exception] { 
LocatorHelper.parseLocatorsString("local^host:2345,localhost.1234") }
+  }
+
+  test("pickPreferredGeodeServers: shared servers and one gf-server per host") 
{
+    val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), 
("host3", 4003),("host4", 4004))
+    val servers = Seq(srv1, srv2, srv3, srv4)
+    verifyPickPreferredGeodeServers(servers, "host1", "<driver>", Seq(srv1, 
srv2, srv3))
+    verifyPickPreferredGeodeServers(servers, "host2", "0", Seq(srv2, srv3, 
srv4))
+    verifyPickPreferredGeodeServers(servers, "host3", "1", Seq(srv3, srv4, 
srv1))
+    verifyPickPreferredGeodeServers(servers, "host4", "2", Seq(srv4, srv1, 
srv2))
+  }
+
+  test("pickPreferredGeodeServers: shared servers, one gf-server per host, 
un-sorted list") {
+    val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), 
("host3", 4003),("host4", 4004))
+    val servers = Seq(srv4, srv2, srv3, srv1)
+    verifyPickPreferredGeodeServers(servers, "host1", "<driver>", Seq(srv1, 
srv2, srv3))
+    verifyPickPreferredGeodeServers(servers, "host2", "0", Seq(srv2, srv3, 
srv4))
+    verifyPickPreferredGeodeServers(servers, "host3", "1", Seq(srv3, srv4, 
srv1))
+    verifyPickPreferredGeodeServers(servers, "host4", "2", Seq(srv4, srv1, 
srv2))
+  }
+
+  test("pickPreferredGeodeServers: shared servers and two gf-server per host") 
{
+    val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host1", 4002), 
("host2", 4003), ("host2", 4004))
+    val servers = Seq(srv1, srv2, srv3, srv4)
+    verifyPickPreferredGeodeServers(servers, "host1", "<driver>", Seq(srv1, 
srv2, srv3))
+    verifyPickPreferredGeodeServers(servers, "host1", "0", Seq(srv2, srv1, 
srv3))
+    verifyPickPreferredGeodeServers(servers, "host2", "1", Seq(srv3, srv4, 
srv1))
+    verifyPickPreferredGeodeServers(servers, "host2", "2", Seq(srv4, srv3, 
srv1))
+  }
+
+  test("pickPreferredGeodeServers: shared servers, two gf-server per host, 
un-sorted server list") {
+    val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host1", 4002), 
("host2", 4003), ("host2", 4004))
+    val servers = Seq(srv1, srv4, srv3, srv2)
+    verifyPickPreferredGeodeServers(servers, "host1", "<driver>", Seq(srv1, 
srv2, srv3))
+    verifyPickPreferredGeodeServers(servers, "host1", "0", Seq(srv2, srv1, 
srv3))
+    verifyPickPreferredGeodeServers(servers, "host2", "1", Seq(srv3, srv4, 
srv1))
+    verifyPickPreferredGeodeServers(servers, "host2", "2", Seq(srv4, srv3, 
srv1))
+  }
+
+  test("pickPreferredGeodeServers: no shared servers and one gf-server per 
host") {
+    val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), 
("host3", 4003),("host4", 4004))
+    val servers = Seq(srv1, srv2, srv3, srv4)
+    verifyPickPreferredGeodeServers(servers, "host5", "<driver>", Seq(srv1, 
srv2, srv3))
+    verifyPickPreferredGeodeServers(servers, "host6", "0", Seq(srv2, srv3, 
srv4))
+    verifyPickPreferredGeodeServers(servers, "host7", "1", Seq(srv3, srv4, 
srv1))
+    verifyPickPreferredGeodeServers(servers, "host8", "2", Seq(srv4, srv1, 
srv2))
+  }
+
+  test("pickPreferredGeodeServers: no shared servers, one gf-server per host, 
and less gf-server") {
+    val (srv1, srv2) = (("host1", 4001), ("host2", 4002))
+    val servers = Seq(srv1, srv2)
+    verifyPickPreferredGeodeServers(servers, "host5", "<driver>", Seq(srv1, 
srv2))
+    verifyPickPreferredGeodeServers(servers, "host6", "0", Seq(srv2, srv1))
+    verifyPickPreferredGeodeServers(servers, "host7", "1", Seq(srv1, srv2))
+    verifyPickPreferredGeodeServers(servers, "host8", "2", Seq(srv2, srv1))
+
+
+    println("host name: " + InetAddress.getLocalHost.getHostName)
+    println("canonical host name: " + 
InetAddress.getLocalHost.getCanonicalHostName)
+    println("canonical host name 2: " + 
InetAddress.getByName(InetAddress.getLocalHost.getHostName).getCanonicalHostName)
+  }
+
+  test("pickPreferredGeodeServers: ad-hoc") {
+    val (srv4, srv5, srv6) = (
+      ("w2-gst-pnq-04.gemstone.com", 40411), ("w2-gst-pnq-05.gemstone.com", 
40411), ("w2-gst-pnq-06.gemstone.com", 40411))
+    val servers = Seq(srv6, srv5, srv4)
+    verifyPickPreferredGeodeServers(servers, "w2-gst-pnq-03.gemstone.com", 
"<driver>", Seq(srv4, srv5, srv6))
+    verifyPickPreferredGeodeServers(servers, "w2-gst-pnq-04.gemstone.com", 
"1", Seq(srv4, srv5, srv6))
+    verifyPickPreferredGeodeServers(servers, "w2-gst-pnq-05.gemstone.com", 
"0", Seq(srv5, srv6, srv4))
+    verifyPickPreferredGeodeServers(servers, "w2-gst-pnq-06.gemstone.com", 
"2", Seq(srv6, srv4, srv5))
+  }
+  
+  def verifyPickPreferredGeodeServers(
+    servers: Seq[(String, Int)], hostName: String, executorId: String, 
expectation: Seq[(String, Int)]): Unit = {
+    val result = LocatorHelper.pickPreferredGeodeServers(servers, hostName, 
executorId)
+    assert(result == expectation, s"pick servers for $hostName:$executorId")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1fc4e6e0/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/rdd/GeodeRDDPartitionerTest.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/rdd/GeodeRDDPartitionerTest.scala
 
b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/rdd/GeodeRDDPartitionerTest.scala
new file mode 100644
index 0000000..2f92c1d
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/org/apache/geode/spark/connector/rdd/GeodeRDDPartitionerTest.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unittest.io.pivotal.geode.spark.connector.rdd
+
+import org.apache.geode.distributed.internal.ServerLocation
+import io.pivotal.geode.spark.connector.internal.RegionMetadata
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartitioner._
+import io.pivotal.geode.spark.connector.GeodeConnection
+import io.pivotal.geode.spark.connector.internal.rdd._
+import org.apache.spark.Partition
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, FunSuite}
+
+import java.util.{HashSet => JHashSet, HashMap => JHashMap}
+
+import scala.collection.mutable
+
+class GeodeRDDPartitionerTest extends FunSuite with Matchers with MockitoSugar 
{
+
+  val emptyServerBucketMap: JHashMap[ServerLocation, JHashSet[Integer]] = new 
JHashMap()
+
+  def toJavaServerBucketMap(map: Map[(String, Int), Set[Int]]): 
JHashMap[ServerLocation, JHashSet[Integer]] = {
+    import scala.collection.JavaConversions._
+    val tmp = map.map {case ((host, port), set) => (new ServerLocation(host, 
port), set.map(Integer.valueOf))}
+    (new JHashMap[ServerLocation, JHashSet[Integer]]() /: tmp) { case (acc, 
(s, jset)) => acc.put(s, new JHashSet(jset)); acc }
+  }
+  
+  val map: mutable.Map[(String, Int), mutable.Set[Int]] = mutable.Map(
+    ("s0",1) -> mutable.Set.empty, ("s1",2) -> mutable.Set(0), ("s2",3) -> 
mutable.Set(1, 2), ("s3",4) -> mutable.Set(3, 4, 5))
+
+  
+  // update this test whenever change default setting 
+  test("default partitioned region partitioner") {
+    assert(GeodeRDDPartitioner.defaultPartitionedRegionPartitioner === 
ServerSplitsPartitioner)
+  }
+
+  // update this test whenever change default setting 
+  test("default replicated region partitioner") {
+    assert(GeodeRDDPartitioner.defaultReplicatedRegionPartitioner === 
OnePartitionPartitioner)
+  }
+  
+  test("GeodeRDDPartitioner.apply method") {
+    import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartitioner._
+    for ((name, partitioner) <- partitioners) assert(GeodeRDDPartitioner(name) 
== partitioner)
+    assert(GeodeRDDPartitioner("dummy") == 
GeodeRDDPartitioner.defaultPartitionedRegionPartitioner)
+    assert(GeodeRDDPartitioner() == 
GeodeRDDPartitioner.defaultPartitionedRegionPartitioner)
+  }
+  
+  test("OnePartitionPartitioner") {
+    val mockConnection = mock[GeodeConnection]
+    val partitions = OnePartitionPartitioner.partitions[String, 
String](mockConnection, null, Map.empty)
+    verifySinglePartition(partitions)
+  }
+
+  def verifySinglePartition(partitions: Array[Partition]): Unit = {
+    assert(1 == partitions.size)
+    assert(partitions(0).index === 0)
+    assert(partitions(0).isInstanceOf[GeodeRDDPartition])
+    assert(partitions(0).asInstanceOf[GeodeRDDPartition].bucketSet.isEmpty)
+  }
+
+  test("ServerSplitsPartitioner.doPartitions(): n=1 & no empty bucket") {
+    val map: List[(String, mutable.Set[Int])] = List(
+      "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4))
+    val partitions = ServerSplitsPartitioner.doPartitions(map, 6, 1)
+    verifyPartitions(partitions, List(
+      (Set(0, 1, 2, 3), Seq("server1")), (Set(4, 5), Seq("server2"))))
+  }
+
+  test("ServerSplitsPartitioner.doPartitions(): n=1 & 1 empty bucket") {
+    val map: List[(String, mutable.Set[Int])] = List(
+      "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4))
+    val partitions = ServerSplitsPartitioner.doPartitions(map, 7, 1)
+    verifyPartitions(partitions, List(
+      (Set(0, 1, 2, 3, 6), Seq("server1")), (Set(4, 5), Seq("server2"))))
+  }
+
+  test("ServerSplitsPartitioner.doPartitions(): n=1 & 2 empty bucket") {
+    val map: List[(String, mutable.Set[Int])] = List(
+      "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4))
+    val partitions = ServerSplitsPartitioner.doPartitions(map, 8, 1)
+    verifyPartitions(partitions, List(
+      (Set(0, 1, 2, 3, 6), Seq("server1")), (Set(4, 5, 7), Seq("server2"))))
+  }
+
+  test("ServerSplitsPartitioner.doPartitions(): n=1 & 5 empty bucket") {
+    val map: List[(String, mutable.Set[Int])] = List(
+      "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4))
+    val partitions = ServerSplitsPartitioner.doPartitions(map, 11, 1)
+    verifyPartitions(partitions, List(
+      (Set(0, 1, 2, 3, 6, 7, 8), Seq("server1")), (Set(4, 5, 9, 10), 
Seq("server2"))))
+  }
+
+  test("ServerSplitsPartitioner.doPartitions(): n=1, 4 empty-bucket, 
non-continuous IDs") {
+    val map: List[(String, mutable.Set[Int])] = List(
+      "server1" -> mutable.Set(1, 3), "server2" -> mutable.Set(5,7))
+    val partitions = ServerSplitsPartitioner.doPartitions(map, 8, 1)
+    verifyPartitions(partitions, List(
+      (Set(0, 1, 2, 3), Seq("server1")), (Set(4, 5, 6, 7), Seq("server2"))))
+  }
+
+  test("ServerSplitsPartitioner.doPartitions(): n=2, no empty buckets, 3 
servers have 1, 2, and 3 buckets") {
+    val map: List[(String, mutable.Set[Int])] = List(
+      "s1" -> mutable.Set(0), "s2" -> mutable.Set(1, 2), "s3" -> 
mutable.Set(3, 4, 5))
+    val partitions = ServerSplitsPartitioner.doPartitions(map, 6, 2)
+    // partitions.foreach(println)
+    verifyPartitions(partitions, List(
+      (Set(0), Seq("s1")), (Set(1), Seq("s2")), (Set(2), Seq("s2")), (Set(3, 
4), Seq("s3")), (Set(5), Seq("s3"))))
+  }
+
+  test("ServerSplitsPartitioner.doPartitions(): n=3, no empty buckets, 4 
servers have 0, 2, 3, and 4 buckets") {
+    val map: List[(String, mutable.Set[Int])] = List(
+      "s0" -> mutable.Set.empty, "s1" -> mutable.Set(0, 1), "s2" -> 
mutable.Set(2, 3, 4), "s3" -> mutable.Set(5, 6, 7, 8))
+    val partitions = ServerSplitsPartitioner.doPartitions(map, 9, 3)
+    // partitions.foreach(println)
+    verifyPartitions(partitions, List(
+      (Set(0), Seq("s1")), (Set(1), Seq("s1")), (Set(2), Seq("s2")), (Set(3), 
Seq("s2")), (Set(4), Seq("s2")),
+      (Set(5, 6), Seq("s3")), (Set(7, 8), Seq("s3")) ))
+  }
+
+  test("ServerSplitsPartitioner.partitions(): metadata = None ") {
+    val regionPath = "test"
+    val mockConnection = mock[GeodeConnection]
+    intercept[RuntimeException] { ServerSplitsPartitioner.partitions[String, 
String](mockConnection, null, Map.empty) }
+  }
+
+  test("ServerSplitsPartitioner.partitions(): replicated region ") {
+    val regionPath = "test"
+    val mockConnection = mock[GeodeConnection]
+    val md = new RegionMetadata(regionPath, false, 11, null)
+    when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md))
+    val partitions = ServerSplitsPartitioner.partitions[String, 
String](mockConnection, md, Map.empty)
+    verifySinglePartition(partitions)
+  }
+
+  test("ServerSplitsPartitioner.partitions(): partitioned region w/o data ") {
+    val regionPath = "test"
+    val mockConnection = mock[GeodeConnection]
+    val md = new RegionMetadata(regionPath, true, 6, emptyServerBucketMap)
+    when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md))
+    val partitions = ServerSplitsPartitioner.partitions[String, 
String](mockConnection, md, Map.empty)
+    verifySinglePartition(partitions)
+  }
+
+  test("ServerSplitsPartitioner.partitions(): partitioned region w/ some data 
") {
+    import io.pivotal.geode.spark.connector.NumberPartitionsPerServerPropKey
+    val regionPath = "test"
+    val mockConnection = mock[GeodeConnection]
+    val map: Map[(String, Int), Set[Int]] = Map(
+      ("s0",1) -> Set.empty, ("s1",2) -> Set(0), ("s2",3) -> Set(1, 2), 
("s3",4) -> Set(3, 4, 5))
+    val md = new RegionMetadata(regionPath, true, 6, 
toJavaServerBucketMap(map))
+    when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md))
+    val partitions = ServerSplitsPartitioner.partitions[String, 
String](mockConnection, md, Map(NumberPartitionsPerServerPropKey->"2"))
+    // partitions.foreach(println)
+    verifyPartitions(partitions, List(
+      (Set(0), Seq("s1")), (Set(1), Seq("s2")), (Set(2), Seq("s2")), (Set(3, 
4), Seq("s3")), (Set(5), Seq("s3"))))
+  }
+  
+  // Note: since the order of partitions is not pre-determined, we have to 
verify partition id
+  // and contents separately
+  def verifyPartitions(partitions: Array[Partition], expPartitions: 
List[(Set[Int], Seq[String])]): Unit = {
+    // 1. check size
+    assert(partitions.size == expPartitions.size)
+    // 2. check IDs are 0 to n-1
+    (0 until partitions.size).toList.zip(partitions).foreach { case (id, p) => 
assert(id == p.index) }
+
+    // 3. get all pairs of bucket set and its locations, and compare to the 
expected pairs
+    val list = partitions.map { e =>
+      val p = e.asInstanceOf[GeodeRDDPartition]
+      (p.bucketSet, p.locations)
+    }
+    expPartitions.foreach(e => assert(list.contains(e)))    
+  }
+
+}


Reply via email to