http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala
deleted file mode 100644
index f971a3e..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.rdd
-
-import org.apache.geode.cache.Region
-import io.pivotal.geode.spark.connector.GeodeConnectionConf
-import org.apache.spark.{TaskContext, Partition}
-import org.apache.spark.rdd.RDD
-import scala.collection.JavaConversions._
-
-/**
- * An `RDD[T, V]` that will represent the result of a join between `left` 
RDD[T]
- * and the specified Geode Region[K, V].
- */
-class GeodeJoinRDD[T, K, V] private[connector]
-  ( left: RDD[T],
-    func: T => K,
-    val regionPath: String,
-    val connConf: GeodeConnectionConf
-  ) extends RDD[(T, V)](left.context, left.dependencies) {
-
-  /** validate region existence when GeodeRDD object is created */
-  validate()
-
-  /** Validate region, and make sure it exists. */
-  private def validate(): Unit = connConf.getConnection.validateRegion[K, 
V](regionPath)
-
-  override protected def getPartitions: Array[Partition] = left.partitions
-
-  override def compute(split: Partition, context: TaskContext): Iterator[(T, 
V)] = {
-    val region = connConf.getConnection.getRegionProxy[K, V](regionPath)
-    if (func == null) computeWithoutFunc(split, context, region)
-    else computeWithFunc(split, context, region)
-  }
-
-  /** T is (K, V1) since there's no map function `func` */
-  private def computeWithoutFunc(split: Partition, context: TaskContext, 
region: Region[K, V]): Iterator[(T, V)] = {
-    val leftPairs = left.iterator(split, context).toList.asInstanceOf[List[(K, 
_)]]
-    val leftKeys = leftPairs.map { case (k, v) => k}.toSet
-    // Note: get all will return (key, null) for non-exist entry, so remove 
those entries
-    val rightPairs = region.getAll(leftKeys).filter { case (k, v) => v != null}
-    leftPairs.filter{case (k, v) => rightPairs.contains(k)}
-             .map {case (k, v) => ((k, v).asInstanceOf[T], 
rightPairs.get(k).get)}.toIterator
-  }
-  
-  private def computeWithFunc(split: Partition, context: TaskContext, region: 
Region[K, V]): Iterator[(T, V)] = {
-    val leftPairs = left.iterator(split, context).toList.map(t => (t, func(t)))
-    val leftKeys = leftPairs.map { case (t, k) => k}.toSet
-    // Note: get all will return (key, null) for non-exist entry, so remove 
those entries
-    val rightPairs = region.getAll(leftKeys).filter { case (k, v) => v != null}
-    leftPairs.filter { case (t, k) => rightPairs.contains(k)}.map {case (t, k) 
=> (t, rightPairs.get(k).get)}.toIterator
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala
deleted file mode 100644
index 04855c1..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.rdd
-
-import org.apache.geode.cache.Region
-import io.pivotal.geode.spark.connector.GeodeConnectionConf
-import org.apache.spark.{TaskContext, Partition}
-import org.apache.spark.rdd.RDD
-import scala.collection.JavaConversions._
-
-/**
- * An `RDD[ T, Option[V] ]` that represents the result of a left outer join 
- * between `left` RDD[T] and the specified Geode Region[K, V].
- */
-class GeodeOuterJoinRDD[T, K, V] private[connector]
- ( left: RDD[T],
-   func: T => K,
-   val regionPath: String,
-   val connConf: GeodeConnectionConf
-  ) extends RDD[(T, Option[V])](left.context, left.dependencies) {
-
-  /** validate region existence when GeodeRDD object is created */
-  validate()
-
-  /** Validate region, and make sure it exists. */
-  private def validate(): Unit = connConf.getConnection.validateRegion[K, 
V](regionPath)
-
-  override protected def getPartitions: Array[Partition] = left.partitions
-
-  override def compute(split: Partition, context: TaskContext): Iterator[(T, 
Option[V])] = {
-    val region = connConf.getConnection.getRegionProxy[K, V](regionPath)
-    if (func == null) computeWithoutFunc(split, context, region)
-    else computeWithFunc(split, context, region)
-  }
-
-  /** T is (K1, V1), and K1 and K are the same type since there's no map 
function `func` */
-  private def computeWithoutFunc(split: Partition, context: TaskContext, 
region: Region[K, V]): Iterator[(T, Option[V])] = {
-    val leftPairs = left.iterator(split, context).toList.asInstanceOf[List[(K, 
_)]]
-    val leftKeys = leftPairs.map { case (k, v) => k}.toSet
-    // Note: get all will return (key, null) for non-exist entry
-    val rightPairs = region.getAll(leftKeys)
-    // rightPairs is a java.util.Map, not scala map, so need to convert 
map.get() to Option
-    leftPairs.map{ case (k, v) => ((k, v).asInstanceOf[T], 
Option(rightPairs.get(k))) }.toIterator
-  }
-
-  private def computeWithFunc(split: Partition, context: TaskContext, region: 
Region[K, V]): Iterator[(T, Option[V])] = {
-    val leftPairs = left.iterator(split, context).toList.map(t => (t, func(t)))
-    val leftKeys = leftPairs.map { case (t, k) => k}.toSet
-    // Note: get all will return (key, null) for non-exist entry
-    val rightPairs = region.getAll(leftKeys)
-    // rightPairs is a java.util.Map, not scala map, so need to convert 
map.get() to Option
-    leftPairs.map{ case (t, k) => (t, Option(rightPairs.get(k)))}.toIterator
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartition.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartition.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartition.scala
deleted file mode 100644
index 24fe72e..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartition.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.rdd
-
-import org.apache.spark.Partition
-
-/**
- * This serializable class represents a GeodeRDD partition. Each partition is 
mapped
- * to one or more buckets of region. The GeodeRDD can materialize the data of 
the 
- * partition based on all information contained here.
- * @param partitionId partition id, a 0 based number.
- * @param bucketSet region bucket id set for this partition. Set.empty means 
whole
- *                  region (used for replicated region)
- * @param locations preferred location for this partition                  
- */
-case class GeodeRDDPartition (
-  partitionId: Int, bucketSet: Set[Int], locations: Seq[String] = Nil)
-  extends Partition  {
-  
-  override def index: Int = partitionId
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitioner.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitioner.scala
deleted file mode 100644
index d960cab..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitioner.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.rdd
-
-import io.pivotal.geode.spark.connector.GeodeConnection
-import io.pivotal.geode.spark.connector.internal.RegionMetadata
-import org.apache.spark.{Logging, Partition}
-
-import scala.reflect.ClassTag
-
-/**
- * A GeodeRDD partitioner is used to partition the region into multiple RDD 
partitions.
- */
-trait GeodeRDDPartitioner extends Serializable {
-
-  def name: String
-  
-  /** the function that generates partitions */
-  def partitions[K: ClassTag, V: ClassTag]
-    (conn: GeodeConnection, md: RegionMetadata, env: Map[String, String]): 
Array[Partition]
-}
-
-object GeodeRDDPartitioner extends Logging {
-
-  /** To add new partitioner, just add it to the following list */
-  final val partitioners: Map[String, GeodeRDDPartitioner] =
-    List(OnePartitionPartitioner, ServerSplitsPartitioner).map(e => (e.name, 
e)).toMap
-
-  /**
-   * Get a partitioner based on given name, a default partitioner will be 
returned if there's
-   * no partitioner for the given name. 
-   */
-  def apply(name: String = defaultPartitionedRegionPartitioner.name): 
GeodeRDDPartitioner = {
-    val p = partitioners.get(name)
-    if (p.isDefined) p.get else {
-      logWarning(s"Invalid preferred partitioner name $name.")
-      defaultPartitionedRegionPartitioner
-    }
-  }
-
-  val defaultReplicatedRegionPartitioner = OnePartitionPartitioner
-
-  val defaultPartitionedRegionPartitioner = ServerSplitsPartitioner
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitionerImpl.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitionerImpl.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitionerImpl.scala
deleted file mode 100644
index 4606114..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitionerImpl.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.rdd
-
-import io.pivotal.geode.spark.connector.GeodeConnection
-import io.pivotal.geode.spark.connector.internal.RegionMetadata
-import io.pivotal.geode.spark.connector.NumberPartitionsPerServerPropKey
-import org.apache.spark.Partition
-import scala.collection.JavaConversions._
-import scala.collection.immutable.SortedSet
-import scala.collection.mutable
-import scala.reflect.ClassTag
-
-/** This partitioner maps whole region to one GeodeRDDPartition */
-object OnePartitionPartitioner extends GeodeRDDPartitioner {
-
-  override val name = "OnePartition"
-
-  override def partitions[K: ClassTag, V: ClassTag]
-    (conn: GeodeConnection, md: RegionMetadata, env: Map[String, String]): 
Array[Partition] =
-    Array[Partition](new GeodeRDDPartition(0, Set.empty))
-}
-
-/**
-  * This partitioner maps whole region to N * M Geode RDD partitions, where M 
is the number of 
-  * Geode servers that contain the data for the given region. Th default value 
of N is 1.
-  */
-object ServerSplitsPartitioner extends GeodeRDDPartitioner {
-
-  override val name = "ServerSplits"
-
-  override def partitions[K: ClassTag, V: ClassTag]
-  (conn: GeodeConnection, md: RegionMetadata, env: Map[String, String]): 
Array[Partition] = {
-    if (md == null) throw new RuntimeException("RegionMetadata is null")
-    val n = try { env.getOrElse(NumberPartitionsPerServerPropKey, "2").toInt } 
catch { case e: NumberFormatException => 2 }
-    if (!md.isPartitioned || md.getServerBucketMap == null || 
md.getServerBucketMap.isEmpty)
-      Array[Partition](new GeodeRDDPartition(0, Set.empty))
-    else {
-      val map = mapAsScalaMap(md.getServerBucketMap)
-        .map { case (srv, set) => (srv, asScalaSet(set).map(_.toInt)) }.toList
-        .map { case (srv, set) => (srv.getHostName, set) }
-       doPartitions(map, md.getTotalBuckets, n)
-    }
-  }
-
-  /** Converts server to bucket ID set list to array of RDD partitions */
-  def doPartitions(serverBucketMap: List[(String, mutable.Set[Int])], 
totalBuckets: Int, n: Int)
-    : Array[Partition] = {
-
-    // method that calculates the group size for splitting "k" items into "g" 
groups
-    def groupSize(k: Int, g: Int): Int = scala.math.ceil(k / g.toDouble).toInt
-
-    // 1. convert list of server and bucket set pairs to a list of server and 
sorted bucket set pairs
-    val srvToSortedBucketSet = serverBucketMap.map { case (srv, set) => (srv, 
SortedSet[Int]() ++ set) }
-
-    // 2. split bucket set of each server into n splits if possible, and 
server to Seq(server)
-    val srvToSplitedBuckeSet = srvToSortedBucketSet.flatMap { case (host, set) 
=>
-      if (set.isEmpty) Nil else set.grouped(groupSize(set.size, 
n)).toList.map(s => (Seq(host), s)) }
-
-    // 3. calculate empty bucket IDs by removing all bucket sets of all 
servers from the full bucket sets
-    val emptyIDs = SortedSet[Int]() ++ ((0 until totalBuckets).toSet /: 
srvToSortedBucketSet) {case (s1, (k, s2)) => s1 &~ s2}
-
-    // 4. distribute empty bucket IDs to all partitions evenly.
-    //    The empty buckets do not contain data when partitions are created, 
but they may contain data
-    //    when RDD is materialized, so need to include those bucket IDs in the 
partitions.
-    val srvToFinalBucketSet = if (emptyIDs.isEmpty) srvToSplitedBuckeSet
-      else srvToSplitedBuckeSet.zipAll(
-        emptyIDs.grouped(groupSize(emptyIDs.size, 
srvToSplitedBuckeSet.size)).toList, (Nil, Set.empty), Set.empty).map
-          { case ((server, set1), set2) => (server, SortedSet[Int]() ++ set1 
++ set2) }
-
-    // 5. create array of partitions w/ 0-based index
-    (0 until srvToFinalBucketSet.size).toList.zip(srvToFinalBucketSet).map
-      { case (i, (srv, set)) => new GeodeRDDPartition(i, set, srv) }.toArray
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDWriter.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDWriter.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDWriter.scala
deleted file mode 100644
index 27559f3..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDWriter.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.rdd
-
-import org.apache.geode.cache.Region
-import io.pivotal.geode.spark.connector._
-import org.apache.spark.{Logging, TaskContext}
-
-import scala.collection.Iterator
-import java.util.{HashMap => JMap}
-
-/** This trait provide some common code for pair and non-pair RDD writer */
-private[rdd] abstract class GeodeRDDWriterBase (opConf: Map[String, String]) 
extends Serializable {
-
-  val batchSize = try { opConf.getOrElse(RDDSaveBatchSizePropKey, 
RDDSaveBatchSizeDefault.toString).toInt}
-                  catch { case e: NumberFormatException => 
RDDSaveBatchSizeDefault }
-
-  def mapDump(map: Map[_, _], num: Int): String = {
-    val firstNum = map.take(num + 1)
-    if (firstNum.size > num) s"$firstNum ..." else s"$firstNum"    
-  }  
-}
-
-/**
- * Writer object that provides write function that saves non-pair RDD 
partitions to Geode.
- * Those functions will be executed on Spark executors.
- * @param regionPath the full path of the region where the data is written to
- */
-class GeodeRDDWriter[T, K, V] 
-  (regionPath: String, connConf: GeodeConnectionConf, opConf: Map[String, 
String] = Map.empty)
-  extends GeodeRDDWriterBase(opConf) with Serializable with Logging {
-
-  def write(func: T => (K, V))(taskContext: TaskContext, data: Iterator[T]): 
Unit = {
-    val region: Region[K, V] = connConf.getConnection.getRegionProxy[K, 
V](regionPath)
-    var count = 0
-    val chunks = data.grouped(batchSize)
-    chunks.foreach { chunk =>
-      val map = chunk.foldLeft(new JMap[K, V]()){case (m, t) => val (k, v) = 
func(t); m.put(k, v); m}
-      region.putAll(map)
-      count += chunk.length
-    }
-    logDebug(s"$count entries (batch.size = $batchSize) are saved to region 
$regionPath")
-  }
-}
-
-
-/**
- * Writer object that provides write function that saves pair RDD partitions 
to Geode.
- * Those functions will be executed on Spark executors.
- * @param regionPath the full path of the region where the data is written to
- */
-class GeodePairRDDWriter[K, V]
-  (regionPath: String, connConf: GeodeConnectionConf, opConf: Map[String, 
String] = Map.empty)
-  extends GeodeRDDWriterBase(opConf) with Serializable with Logging {
-
-  def write(taskContext: TaskContext, data: Iterator[(K, V)]): Unit = {
-    val region: Region[K, V] = connConf.getConnection.getRegionProxy[K, 
V](regionPath)
-    var count = 0
-    val chunks = data.grouped(batchSize)
-    chunks.foreach { chunk =>
-      val map = chunk.foldLeft(new JMap[K, V]()){case (m, (k,v)) => 
m.put(k,v); m}
-      region.putAll(map)
-      count += chunk.length
-    }
-    logDebug(s"$count entries (batch.batch = $batchSize) are saved to region 
$regionPath")
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRegionRDD.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRegionRDD.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRegionRDD.scala
deleted file mode 100644
index 6980c0f..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRegionRDD.scala
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.rdd
-
-import scala.collection.Seq
-import scala.reflect.ClassTag
-import org.apache.spark.rdd.RDD
-import org.apache.spark.{TaskContext, Partition, SparkContext}
-import io.pivotal.geode.spark.connector.{GeodeConnectionConf, 
PreferredPartitionerPropKey}
-import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartitioner._
-
-/**
- * This class exposes Geode region as a RDD.
- * @param sc the Spark Context
- * @param regionPath the full path of the region
- * @param connConf the GeodeConnectionConf to access the region
- * @param opConf the parameters for this operation, such as preferred 
partitioner.
- */
-class GeodeRegionRDD[K, V] private[connector]
-  (@transient sc: SparkContext,
-   val regionPath: String,
-   val connConf: GeodeConnectionConf,
-   val opConf: Map[String, String] = Map.empty,
-   val whereClause: Option[String] = None 
-  ) (implicit ctk: ClassTag[K], ctv: ClassTag[V])
-  extends RDD[(K, V)](sc, Seq.empty) {
-
-  /** validate region existence when GeodeRDD object is created */
-  validate()
-
-  /** Validate region, and make sure it exists. */
-  private def validate(): Unit = connConf.getConnection.validateRegion[K, 
V](regionPath)
-
-  def kClassTag = ctk
-  
-  def vClassTag = ctv
-
-  /**
-   * method `copy` is used by method `where` that creates new immutable
-   * GeodeRDD instance based this instance.
-   */
-  private def copy(
-    regionPath: String = regionPath,
-    connConf: GeodeConnectionConf = connConf,
-    opConf: Map[String, String] = opConf,
-    whereClause: Option[String] = None
-  ): GeodeRegionRDD[K, V] = {
-
-    require(sc != null,
-    """RDD transformation requires a non-null SparkContext. Unfortunately
-      |SparkContext in this GeodeRDD is null. This can happen after 
-      |GeodeRDD has been deserialized. SparkContext is not Serializable,
-      |therefore it deserializes to null. RDD transformations are not allowed
-      |inside lambdas used in other RDD transformations.""".stripMargin )
-
-    new GeodeRegionRDD[K, V](sc, regionPath, connConf, opConf, whereClause)
-  }
-
-  /** When where clause is specified, OQL query
-    * `select key, value from /<region-path>.entries where <where clause> `
-    * is used to filter the dataset.
-    */
-  def where(whereClause: Option[String]): GeodeRegionRDD[K, V] = {
-    if (whereClause.isDefined) copy(whereClause = whereClause)
-    else this
-  }
-
-  /** this version is for Java API that doesn't use scala.Option */
-  def where(whereClause: String): GeodeRegionRDD[K, V] = {
-    if (whereClause == null || whereClause.trim.isEmpty) this
-    else copy(whereClause = Option(whereClause.trim))
-  }
-
-  /**
-   * Use preferred partitioner generate partitions. 
`defaultReplicatedRegionPartitioner`
-   * will be used if it's a replicated region. 
-   */
-  override def getPartitions: Array[Partition] = {
-    val conn = connConf.getConnection
-    val md = conn.getRegionMetadata[K, V](regionPath)
-    md match {
-      case None => throw new RuntimeException(s"region $regionPath was not 
found.")
-      case Some(data) =>
-        logInfo(s"""RDD id=${this.id} region=$regionPath 
conn=${connConf.locators.mkString(",")}, env=$opConf""")
-        val p = if (data.isPartitioned) preferredPartitioner else 
defaultReplicatedRegionPartitioner
-        val splits = p.partitions[K, V](conn, data, opConf)
-        logDebug(s"""RDD id=${this.id} region=$regionPath partitions=\n  
${splits.mkString("\n  ")}""")
-        splits
-    }
-  }
-
-  /**
-   * provide preferred location(s) (host name(s)) of the given partition. 
-   * Only some partitioner implementation(s) provides this info, which is
-   * useful when Spark cluster and Geode cluster share some hosts.
-   */
-  override def getPreferredLocations(split: Partition) =
-    split.asInstanceOf[GeodeRDDPartition].locations
-
-  /**
-   * Get preferred partitioner. return `defaultPartitionedRegionPartitioner` 
if none
-   * preference is specified. 
-   */
-  private def preferredPartitioner = 
-    GeodeRDDPartitioner(opConf.getOrElse(
-      PreferredPartitionerPropKey, 
GeodeRDDPartitioner.defaultPartitionedRegionPartitioner.name))
-
-  /** materialize a RDD partition */
-  override def compute(split: Partition, context: TaskContext): Iterator[(K, 
V)] = {
-    val partition = split.asInstanceOf[GeodeRDDPartition]
-    logDebug(s"compute RDD id=${this.id} partition $partition")
-    connConf.getConnection.getRegionData[K,V](regionPath, whereClause, 
partition)
-    // new InterruptibleIterator(context, 
split.asInstanceOf[GeodeRDDPartition[K, V]].iterator)
-  }
-}
-
-object GeodeRegionRDD {
-
-  def apply[K: ClassTag, V: ClassTag](sc: SparkContext, regionPath: String,
-    connConf: GeodeConnectionConf, opConf: Map[String, String] = Map.empty)
-    : GeodeRegionRDD[K, V] =
-    new GeodeRegionRDD[K, V](sc, regionPath, connConf, opConf)
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRegionRDD.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRegionRDD.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRegionRDD.scala
deleted file mode 100644
index f859173..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRegionRDD.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.javaapi
-
-import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD
-import org.apache.spark.api.java.JavaPairRDD
-
-class GeodeJavaRegionRDD[K, V](rdd: GeodeRegionRDD[K, V]) extends 
JavaPairRDD[K, V](rdd)(rdd.kClassTag, rdd.vClassTag) {
-  
-  def where(whereClause: String): GeodeJavaRegionRDD[K, V] = new 
GeodeJavaRegionRDD(rdd.where(whereClause))
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/JavaAPIHelper.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/JavaAPIHelper.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/JavaAPIHelper.scala
deleted file mode 100644
index ffa6195..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/JavaAPIHelper.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.javaapi
-
-import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
-import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaDStream}
-
-import scala.reflect.ClassTag
-import scala.collection.JavaConversions._
-
-/**
- *  A helper class to make it possible to access components written in Scala 
from Java code.
- */
-private[connector] object JavaAPIHelper {
-
-  /** Returns a `ClassTag` of a given runtime class. */
-  def getClassTag[T](clazz: Class[T]): ClassTag[T] = ClassTag(clazz)
-
-  /**
-   * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
-   * see JavaSparkContext.fakeClassTag in Spark for more info.
-   */
-  def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
-
-  /** Converts a Java `Properties` to a Scala immutable `Map[String, String]`. 
*/
-  def propertiesToScalaMap[K, V](props: java.util.Properties): Map[String, 
String] =
-    Map(props.toSeq: _*)
-
-  /** convert a JavaRDD[(K,V)] to JavaPairRDD[K,V] */
-  def toJavaPairRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] =
-    JavaPairRDD.fromJavaRDD(rdd)
-
-  /** convert a JavaDStream[(K,V)] to JavaPairDStream[K,V] */
-  def toJavaPairDStream[K, V](ds: JavaDStream[(K, V)]): JavaPairDStream[K, V] =
-    JavaPairDStream.fromJavaDStream(ds)
-
-  /** an empty Map[String, String] for default opConf **/
-  val emptyStrStrMap: Map[String, String] = Map.empty
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/package.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/package.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/package.scala
deleted file mode 100644
index 6f9a780..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/package.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark
-
-import io.pivotal.geode.spark.connector.internal.rdd.{ServerSplitsPartitioner, 
OnePartitionPartitioner}
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
-
-import scala.reflect.ClassTag
-
-/**
- * The root package of Geode connector for Apache Spark.
- * Provides handy implicit conversions that add geode-specific
- * methods to `SparkContext` and `RDD`.
- */
-package object connector {
-
-  /** constants */
-  final val GeodeLocatorPropKey = "spark.geode.locators"
-  // partitioner related keys and values
-  final val PreferredPartitionerPropKey = "preferred.partitioner"
-  final val NumberPartitionsPerServerPropKey = "number.partitions.per.server"
-  final val OnePartitionPartitionerName = OnePartitionPartitioner.name
-  final val ServerSplitsPartitionerName = ServerSplitsPartitioner.name
-
-  final val RDDSaveBatchSizePropKey = "rdd.save.batch.size"
-  final val RDDSaveBatchSizeDefault = 10000
-  
-  /** implicits */
-  
-  implicit def toSparkContextFunctions(sc: SparkContext): 
GeodeSparkContextFunctions =
-    new GeodeSparkContextFunctions(sc)
-
-  implicit def toSQLContextFunctions(sqlContext: SQLContext): 
GeodeSQLContextFunctions =
-    new GeodeSQLContextFunctions(sqlContext)
-
-  implicit def toGeodePairRDDFunctions[K: ClassTag, V: ClassTag]
-    (self: RDD[(K, V)]): GeodePairRDDFunctions[K, V] = new 
GeodePairRDDFunctions(self)
-
-  implicit def toGeodeRDDFunctions[T: ClassTag]
-    (self: RDD[T]): GeodeRDDFunctions[T] = new GeodeRDDFunctions(self)
-
-  /** utility implicits */
-  
-  /** convert Map[String, String] to java.util.Properties */
-  implicit def map2Properties(map: Map[String,String]): java.util.Properties =
-    (new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); 
props}
-
-  /** internal util methods */
-  
-  private[connector] def getRddPartitionsInfo(rdd: RDD[_], sep: String = "\n  
"): String =
-    rdd.partitions.zipWithIndex.map{case (p,i) => s"$i: $p 
loc=${rdd.preferredLocations(p)}"}.mkString(sep)
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/GeodeDStreamFunctions.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/GeodeDStreamFunctions.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/GeodeDStreamFunctions.scala
deleted file mode 100644
index 4d46429..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/GeodeDStreamFunctions.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.streaming
-
-import io.pivotal.geode.spark.connector.GeodeConnectionConf
-import io.pivotal.geode.spark.connector.internal.rdd.{GeodePairRDDWriter, 
GeodeRDDWriter}
-import org.apache.spark.Logging
-import org.apache.spark.api.java.function.PairFunction
-import org.apache.spark.streaming.dstream.DStream
-
-/**
- * Extra geode functions on DStream of non-pair elements through an implicit 
conversion.
- * Import `io.pivotal.geode.spark.connector.streaming._` at the top of your 
program to
- * use these functions.
- */
-class GeodeDStreamFunctions[T](val dstream: DStream[T]) extends Serializable 
with Logging {
-
-  /**
-   * Save the DStream of non-pair elements to Geode key-value store.
-   * @param regionPath the full path of region that the DStream is stored
-   * @param func the function that converts elements of the DStream to 
key/value pairs
-   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
-   * @param opConf the optional parameters for this operation
-   */
-  def saveToGeode[K, V](
-      regionPath: String, 
-      func: T => (K, V), 
-      connConf: GeodeConnectionConf = defaultConnectionConf, 
-      opConf: Map[String, String] = Map.empty): Unit = {
-    connConf.getConnection.validateRegion[K, V](regionPath)
-    val writer = new GeodeRDDWriter[T, K, V](regionPath, connConf, opConf)
-    logInfo(s"""Save DStream region=$regionPath 
conn=${connConf.locators.mkString(",")}""")
-    dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write(func) 
_))
-  }
-
-  /** this version of saveToGeode is just for Java API */
-  def saveToGeode[K, V](
-      regionPath: String,
-      func: PairFunction[T, K, V],
-      connConf: GeodeConnectionConf,
-      opConf: Map[String, String] ): Unit = {
-    saveToGeode[K, V](regionPath, func.call _, connConf, opConf)
-  }
-
-  private[connector] def defaultConnectionConf: GeodeConnectionConf =
-    GeodeConnectionConf(dstream.context.sparkContext.getConf)
-}
-
-
-/**
- * Extra geode functions on DStream of (key, value) pairs through an implicit 
conversion.
- * Import `io.pivotal.geode.spark.connector.streaming._` at the top of your 
program to
- * use these functions.
- */
-class GeodePairDStreamFunctions[K, V](val dstream: DStream[(K,V)]) extends 
Serializable with Logging {
-
-  /**
-   * Save the DStream of pairs to Geode key-value store without any conversion
-   * @param regionPath the full path of region that the DStream is stored
-   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
-   * @param opConf the optional parameters for this operation
-   */
-  def saveToGeode(
-      regionPath: String, 
-      connConf: GeodeConnectionConf = defaultConnectionConf, 
-      opConf: Map[String, String] = Map.empty): Unit = {
-    connConf.getConnection.validateRegion[K, V](regionPath)
-    val writer = new GeodePairRDDWriter[K, V](regionPath, connConf, opConf)
-    logInfo(s"""Save DStream region=$regionPath 
conn=${connConf.locators.mkString(",")}""")
-    dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write _))
-  }
-
-  private[connector] def defaultConnectionConf: GeodeConnectionConf =
-    GeodeConnectionConf(dstream.context.sparkContext.getConf)
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/package.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/package.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/package.scala
deleted file mode 100644
index 0d1f1eb..0000000
--- 
a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/package.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector
-
-import org.apache.spark.streaming.dstream.DStream
-
-/**
- * Provides handy implicit conversions that add gemfire-specific methods to 
`DStream`.
- */
-package object streaming {
-
-  implicit def toGeodeDStreamFunctions[T](ds: DStream[T]): 
GeodeDStreamFunctions[T] =
-    new GeodeDStreamFunctions[T](ds)
-
-  implicit def toGeodePairDStreamFunctions[K, V](ds: DStream[(K, V)]): 
GeodePairDStreamFunctions[K, V] =
-    new GeodePairDStreamFunctions[K, V](ds)
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnection.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnection.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnection.scala
new file mode 100644
index 0000000..6c1df67
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnection.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import org.apache.geode.cache.execute.ResultCollector
+import org.apache.geode.cache.query.Query
+import org.apache.geode.cache.Region
+import io.pivotal.geode.spark.connector.internal.RegionMetadata
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartition
+
+
+trait GeodeConnection {
+
+  /**
+   * Validate region existence and key/value type constraints, throw 
RuntimeException
+   * if region does not exist or key and/or value type do(es) not match.
+   * @param regionPath the full path of region
+   */
+  def validateRegion[K, V](regionPath: String): Unit
+
+  /**
+   * Get Region proxy for the given region
+   * @param regionPath the full path of region
+   */
+  def getRegionProxy[K, V](regionPath: String): Region[K, V]
+
+  /**
+   * Retrieve region meta data for the given region. 
+   * @param regionPath: the full path of the region
+   * @return Some[RegionMetadata] if region exists, None otherwise
+   */
+  def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata]
+
+  /** 
+   * Retrieve region data for the given region and bucket set 
+   * @param regionPath: the full path of the region
+   * @param whereClause: the set of bucket IDs
+   * @param split: Geode RDD Partition instance
+   */
+  def getRegionData[K, V](regionPath: String, whereClause: Option[String], 
split: GeodeRDDPartition): Iterator[(K, V)]
+
+  def executeQuery(regionPath: String, bucketSet: Set[Int], queryString: 
String): Object
+  /** 
+   * Create a geode OQL query
+   * @param queryString Geode OQL query string
+   */
+  def getQuery(queryString: String): Query
+
+  /** Close the connection */
+  def close(): Unit
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionConf.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionConf.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionConf.scala
new file mode 100644
index 0000000..38d9e07
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionConf.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import org.apache.spark.SparkConf
+import 
io.pivotal.geode.spark.connector.internal.{DefaultGeodeConnectionManager, 
LocatorHelper}
+
+/**
+ * Stores configuration of a connection to Geode cluster. It is serializable 
and can
+ * be safely sent over network.
+ *
+ * @param locators Geode locator host:port pairs, the default is 
(localhost,10334)
+ * @param geodeProps The initial geode properties to be used.
+ * @param connectionManager GeodeConnectionFactory instance
+ */
+class GeodeConnectionConf(
+   val locators: Seq[(String, Int)], 
+   val geodeProps: Map[String, String] = Map.empty,
+   connectionManager: GeodeConnectionManager = new 
DefaultGeodeConnectionManager
+  ) extends Serializable {
+
+  /** require at least 1 pair of (host,port) */
+  require(locators.nonEmpty)
+  
+  def getConnection: GeodeConnection = connectionManager.getConnection(this)
+  
+}
+
+object GeodeConnectionConf {
+
+  /**
+   * create GeodeConnectionConf object based on locator string and optional 
GeodeConnectionFactory
+   * @param locatorStr Geode cluster locator string
+   * @param connectionManager GeodeConnection factory
+   */
+  def apply(locatorStr: String, geodeProps: Map[String, String] = Map.empty)
+    (implicit connectionManager: GeodeConnectionManager = new 
DefaultGeodeConnectionManager): GeodeConnectionConf = {
+    new GeodeConnectionConf(LocatorHelper.parseLocatorsString(locatorStr), 
geodeProps, connectionManager)
+  }
+
+  /**
+   * create GeodeConnectionConf object based on SparkConf. Note that implicit 
can
+   * be used to control what GeodeConnectionFactory instance to use if desired
+   * @param conf a SparkConf instance 
+   */
+  def apply(conf: SparkConf): GeodeConnectionConf = {
+    val locatorStr = conf.getOption(GeodeLocatorPropKey).getOrElse(
+      throw new RuntimeException(s"SparkConf does not contain property 
$GeodeLocatorPropKey"))
+    // SparkConf only holds properties whose key starts with "spark.", In 
order to
+    // put geode properties in SparkConf, all geode properties are prefixes 
with
+    // "spark.geode.". This prefix was removed before the properties were put 
in `geodeProp`
+    val prefix = "spark.geode."
+    val geodeProps = conf.getAll.filter {
+        case (k, v) => k.startsWith(prefix) && k != GeodeLocatorPropKey
+      }.map { case (k, v) => (k.substring(prefix.length), v) }.toMap
+    apply(locatorStr, geodeProps)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionManager.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionManager.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionManager.scala
new file mode 100644
index 0000000..bf678f0
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionManager.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+/**
+ * GeodeConnectionFactory provide an common interface that manages Geode
+ * connections, and it's serializable. Each factory instance will handle
+ * connection instance creation and connection pool management.
+ */
+trait GeodeConnectionManager extends Serializable {
+
+  /** get connection for the given connector */
+  def getConnection(connConf: GeodeConnectionConf): GeodeConnection
+
+  /** close the connection */
+  def closeConnection(connConf: GeodeConnectionConf): Unit
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeFunctionDeployer.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeFunctionDeployer.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeFunctionDeployer.scala
new file mode 100644
index 0000000..6e93b05
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeFunctionDeployer.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import java.io.File
+import java.net.URL
+import org.apache.commons.httpclient.methods.PostMethod
+import org.apache.commons.httpclient.methods.multipart.{FilePart, Part, 
MultipartRequestEntity}
+import org.apache.commons.httpclient.HttpClient
+import org.apache.spark.Logging
+
+object GeodeFunctionDeployer {
+  def main(args: Array[String]) {
+    new GeodeFunctionDeployer(new HttpClient()).commandLineRun(args)
+  }
+}
+
+class GeodeFunctionDeployer(val httpClient:HttpClient) extends Logging {
+
+  def deploy(host: String, port: Int, jarLocation: String): String =
+    deploy(host + ":" + port, jarLocation)
+  
+  def deploy(host: String, port: Int, jar:File): String =
+    deploy(host + ":" + port, jar)
+  
+  def deploy(jmxHostAndPort: String, jarLocation: String): String =
+    deploy(jmxHostAndPort, jarFileHandle(jarLocation))
+  
+  def deploy(jmxHostAndPort: String, jar: File): String = {
+    val urlString = constructURLString(jmxHostAndPort)
+    val filePost: PostMethod = new PostMethod(urlString)
+    val parts: Array[Part] = new Array[Part](1)
+    parts(0) = new FilePart("resources", jar)
+    filePost.setRequestEntity(new MultipartRequestEntity(parts, 
filePost.getParams))
+    val status: Int = httpClient.executeMethod(filePost)
+    "Deployed Jar with status:" + status
+  }
+
+  private[connector] def constructURLString(jmxHostAndPort: String) =
+    "http://"; + jmxHostAndPort + "/gemfire/v1/deployed"
+
+  private[connector]def jarFileHandle(jarLocation: String) = {
+    val f: File = new File(jarLocation)
+    if (!f.exists()) {
+      val errorMessage: String = "Invalid jar file:" + f.getAbsolutePath
+      logInfo(errorMessage)
+      throw new RuntimeException(errorMessage)
+    }
+    f
+  }
+  
+  def commandLineRun(args: Array[String]):Unit = {
+    val (hostPort: String, jarFile: String) =
+    if (args.length < 2) {
+      logInfo("JMX Manager Host and Port (example: localhost:7070):")
+      val bufferedReader = new java.io.BufferedReader(new 
java.io.InputStreamReader(System.in))
+      val jmxHostAndPort = bufferedReader.readLine()
+      logInfo("Location of geode-functions.jar:")
+      val functionJarLocation = bufferedReader.readLine()
+      (jmxHostAndPort, functionJarLocation)
+    } else {
+      (args(0), args(1))
+    }
+    val status = deploy(hostPort, jarFile)
+    logInfo(status)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeKryoRegistrator.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeKryoRegistrator.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeKryoRegistrator.scala
new file mode 100644
index 0000000..0bf7df5
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeKryoRegistrator.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import com.esotericsoftware.kryo.Kryo
+import io.pivotal.geode.spark.connector.internal.oql.UndefinedSerializer
+import org.apache.spark.serializer.KryoRegistrator
+import org.apache.geode.cache.query.internal.Undefined
+
+class GeodeKryoRegistrator extends KryoRegistrator{
+
+  override def registerClasses(kyro: Kryo): Unit = {
+    kyro.addDefaultSerializer(classOf[Undefined], classOf[UndefinedSerializer])
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodePairRDDFunctions.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodePairRDDFunctions.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodePairRDDFunctions.scala
new file mode 100644
index 0000000..ba5d2df
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodePairRDDFunctions.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import io.pivotal.geode.spark.connector.internal.rdd.{GeodeOuterJoinRDD, 
GeodeJoinRDD, GeodePairRDDWriter}
+import org.apache.spark.Logging
+import org.apache.spark.api.java.function.Function
+import org.apache.spark.rdd.RDD
+
+/**
+ * Extra gemFire functions on RDDs of (key, value) pairs through an implicit 
conversion.
+ * Import `io.pivotal.geode.spark.connector._` at the top of your program to
+ * use these functions.
+ */
+class GeodePairRDDFunctions[K, V](val rdd: RDD[(K, V)]) extends Serializable 
with Logging {
+
+  /**
+   * Save the RDD of pairs to Geode key-value store without any conversion
+   * @param regionPath the full path of region that the RDD is stored
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @param opConf the optional parameters for this operation
+   */
+  def saveToGeode(
+      regionPath: String, 
+      connConf: GeodeConnectionConf = defaultConnectionConf, 
+      opConf: Map[String, String] = Map.empty): Unit = {    
+    connConf.getConnection.validateRegion[K, V](regionPath)
+    if (log.isDebugEnabled)
+      logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n  
${getRddPartitionsInfo(rdd)}""")
+    else
+      logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""")
+    val writer = new GeodePairRDDWriter[K, V](regionPath, connConf, opConf)
+    rdd.sparkContext.runJob(rdd, writer.write _)
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in 
`this`
+   * RDD and the Geode `Region[K, V2]`. Each pair of elements will be returned
+   * as a ((k, v), v2) tuple, where (k, v) is in `this` RDD and (k, v2) is in 
the
+   * Geode region.
+   *
+   *@param regionPath the region path of the Geode region
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @tparam K2 the key type of the Geode region
+   * @tparam V2 the value type of the Geode region
+   * @return RDD[T, V]
+   */
+  def joinGeodeRegion[K2 <: K, V2](
+    regionPath: String, connConf: GeodeConnectionConf = 
defaultConnectionConf): GeodeJoinRDD[(K, V), K, V2] = {
+    new GeodeJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf)
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in 
`this` RDD
+   * and the Geode `Region[K2, V2]`. The join key from RDD element is 
generated by
+   * `func(K, V) => K2`, and the key from the Geode region is jus the key of 
the
+   * key/value pair.
+   *
+   * Each pair of elements of result RDD will be returned as a ((k, v), v2) 
tuple,
+   * where (k, v) is in `this` RDD and (k2, v2) is in the Geode region.
+   *
+   * @param regionPath the region path of the Geode region
+   * @param func the function that generates region key from RDD element (K, V)
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @tparam K2 the key type of the Geode region
+   * @tparam V2 the value type of the Geode region
+   * @return RDD[(K, V), V2]
+   */
+  def joinGeodeRegion[K2, V2](
+    regionPath: String, func: ((K, V)) => K2, connConf: GeodeConnectionConf = 
defaultConnectionConf): GeodeJoinRDD[(K, V), K2, V2] =
+    new GeodeJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf)
+
+  /** This version of joinGeodeRegion(...) is just for Java API. */
+  private[connector] def joinGeodeRegion[K2, V2](
+    regionPath: String, func: Function[(K, V), K2], connConf: 
GeodeConnectionConf): GeodeJoinRDD[(K, V), K2, V2] = {
+    new GeodeJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf)
+  }
+
+  /**
+   * Perform a left outer join of `this` RDD and the Geode `Region[K, V2]`.
+   * For each element (k, v) in `this` RDD, the resulting RDD will either 
contain
+   * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
+   * ((k, v), None)) if no element in the Geode region have key k.
+   *
+   * @param regionPath the region path of the Geode region
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @tparam K2 the key type of the Geode region
+   * @tparam V2 the value type of the Geode region
+   * @return RDD[ (K, V), Option[V] ]
+   */
+  def outerJoinGeodeRegion[K2 <: K, V2](
+    regionPath: String, connConf: GeodeConnectionConf = 
defaultConnectionConf): GeodeOuterJoinRDD[(K, V), K, V2] = {
+    new GeodeOuterJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf)
+  }
+
+  /**
+   * Perform a left outer join of `this` RDD and the Geode `Region[K2, V2]`.
+   * The join key from RDD element is generated by `func(K, V) => K2`, and the
+   * key from region is jus the key of the key/value pair.
+   *
+   * For each element (k, v) in `this` RDD, the resulting RDD will either 
contain
+   * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair
+   * ((k, v), None)) if no element in the Geode region have key `func(k, v)`.
+   *
+   *@param regionPath the region path of the Geode region
+   * @param func the function that generates region key from RDD element (K, V)
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @tparam K2 the key type of the Geode region
+   * @tparam V2 the value type of the Geode region
+   * @return RDD[ (K, V), Option[V] ]
+   */
+  def outerJoinGeodeRegion[K2, V2](
+    regionPath: String, func: ((K, V)) => K2, connConf: GeodeConnectionConf = 
defaultConnectionConf): GeodeOuterJoinRDD[(K, V), K2, V2] = {
+    new GeodeOuterJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf)
+  }
+
+  /** This version of outerJoinGeodeRegion(...) is just for Java API. */
+  private[connector] def outerJoinGeodeRegion[K2, V2](
+    regionPath: String, func: Function[(K, V), K2], connConf: 
GeodeConnectionConf): GeodeOuterJoinRDD[(K, V), K2, V2] = {
+    new GeodeOuterJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf)
+  }
+
+  private[connector] def defaultConnectionConf: GeodeConnectionConf =
+    GeodeConnectionConf(rdd.sparkContext.getConf)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeRDDFunctions.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeRDDFunctions.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeRDDFunctions.scala
new file mode 100644
index 0000000..2e5c92a
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeRDDFunctions.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import io.pivotal.geode.spark.connector.internal.rdd.{GeodeOuterJoinRDD, 
GeodeJoinRDD, GeodeRDDWriter}
+import org.apache.spark.Logging
+import org.apache.spark.api.java.function.{PairFunction, Function}
+import org.apache.spark.rdd.RDD
+
+/**
+ * Extra gemFire functions on non-Pair RDDs through an implicit conversion.
+ * Import `io.pivotal.geode.spark.connector._` at the top of your program to 
+ * use these functions.  
+ */
+class GeodeRDDFunctions[T](val rdd: RDD[T]) extends Serializable with Logging {
+
+  /**
+   * Save the non-pair RDD to Geode key-value store.
+   * @param regionPath the full path of region that the RDD is stored  
+   * @param func the function that converts elements of RDD to key/value pairs
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @param opConf the optional parameters for this operation
+   */
+  def saveToGeode[K, V](
+      regionPath: String, 
+      func: T => (K, V), 
+      connConf: GeodeConnectionConf = defaultConnectionConf,
+      opConf: Map[String, String] = Map.empty): Unit = {
+    connConf.getConnection.validateRegion[K, V](regionPath)
+    if (log.isDebugEnabled)
+      logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n  
${getRddPartitionsInfo(rdd)}""")
+    else
+      logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""")
+    val writer = new GeodeRDDWriter[T, K, V](regionPath, connConf, opConf)
+    rdd.sparkContext.runJob(rdd, writer.write(func) _)
+  }
+
+  /** This version of saveToGeode(...) is just for Java API. */
+  private[connector] def saveToGeode[K, V](
+      regionPath: String, 
+      func: PairFunction[T, K, V], 
+      connConf: GeodeConnectionConf, 
+      opConf: Map[String, String]): Unit = {
+    saveToGeode[K, V](regionPath, func.call _, connConf, opConf)
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in 
`this` RDD
+   * and the Geode `Region[K, V]`. The join key from RDD element is generated 
by
+   * `func(T) => K`, and the key from the Geode region is just the key of the
+   * key/value pair.
+   *
+   * Each pair of elements of result RDD will be returned as a (t, v) tuple, 
+   * where (t) is in `this` RDD and (k, v) is in the Geode region.
+   *
+   * @param regionPath the region path of the Geode region
+   * @param func the function that generate region key from RDD element T
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @tparam K the key type of the Geode region
+   * @tparam V the value type of the Geode region
+   * @return RDD[T, V]
+   */
+  def joinGeodeRegion[K, V](regionPath: String, func: T => K, 
+    connConf: GeodeConnectionConf = defaultConnectionConf): GeodeJoinRDD[T, K, 
V] = {
+    new GeodeJoinRDD[T, K, V](rdd, func, regionPath, connConf)    
+  }
+
+  /** This version of joinGeodeRegion(...) is just for Java API. */
+  private[connector] def joinGeodeRegion[K, V](
+    regionPath: String, func: Function[T, K], connConf: GeodeConnectionConf): 
GeodeJoinRDD[T, K, V] = {
+    joinGeodeRegion(regionPath, func.call _, connConf)
+  }
+
+  /**
+   * Perform a left outer join of `this` RDD and the Geode `Region[K, V]`.
+   * The join key from RDD element is generated by `func(T) => K`, and the
+   * key from region is just the key of the key/value pair.
+   *
+   * For each element (t) in `this` RDD, the resulting RDD will either contain
+   * all pairs (t, Some(v)) for v in the Geode region, or the pair
+   * (t, None) if no element in the Geode region have key `func(t)`
+   *
+   * @param regionPath the region path of the Geode region
+   * @param func the function that generate region key from RDD element T
+   * @param connConf the GeodeConnectionConf object that provides connection 
to Geode cluster
+   * @tparam K the key type of the Geode region
+   * @tparam V the value type of the Geode region
+   * @return RDD[ T, Option[V] ]
+   */
+  def outerJoinGeodeRegion[K, V](regionPath: String, func: T => K,
+    connConf: GeodeConnectionConf = defaultConnectionConf): 
GeodeOuterJoinRDD[T, K, V] = {
+    new GeodeOuterJoinRDD[T, K, V](rdd, func, regionPath, connConf)
+  }
+
+  /** This version of outerJoinGeodeRegion(...) is just for Java API. */
+  private[connector] def outerJoinGeodeRegion[K, V](
+    regionPath: String, func: Function[T, K], connConf: GeodeConnectionConf): 
GeodeOuterJoinRDD[T, K, V] = {
+    outerJoinGeodeRegion(regionPath, func.call _, connConf)
+  }
+
+  private[connector] def defaultConnectionConf: GeodeConnectionConf =
+    GeodeConnectionConf(rdd.sparkContext.getConf)
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeSQLContextFunctions.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeSQLContextFunctions.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeSQLContextFunctions.scala
new file mode 100644
index 0000000..83aab7a
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeSQLContextFunctions.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import io.pivotal.geode.spark.connector.internal.oql.{OQLRelation, QueryRDD}
+import org.apache.spark.Logging
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * Provide Geode OQL specific functions
+ */
+class GeodeSQLContextFunctions(@transient sqlContext: SQLContext) extends 
Serializable with Logging {
+
+  /**
+   * Expose a Geode OQL query result as a DataFrame
+   * @param query the OQL query string.
+   */
+  def geodeOQL(
+    query: String,
+    connConf: GeodeConnectionConf = 
GeodeConnectionConf(sqlContext.sparkContext.getConf)): DataFrame = {
+    logInfo(s"OQL query = $query")
+    val rdd = new QueryRDD[Object](sqlContext.sparkContext, query, connConf)
+    sqlContext.baseRelationToDataFrame(OQLRelation(rdd)(sqlContext))
+  }
+
+  private[connector] def defaultConnectionConf: GeodeConnectionConf =
+    GeodeConnectionConf(sqlContext.sparkContext.getConf)
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeSparkContextFunctions.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeSparkContextFunctions.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeSparkContextFunctions.scala
new file mode 100644
index 0000000..617cb33
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeSparkContextFunctions.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector
+
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD
+import org.apache.spark.SparkContext
+
+import scala.reflect.ClassTag
+
+/** Provides Geode specific methods on `SparkContext` */
+class GeodeSparkContextFunctions(@transient sc: SparkContext) extends 
Serializable {
+
+  /**
+   * Expose a Geode region as a GeodeRDD
+   * @param regionPath the full path of the region
+   * @param connConf the GeodeConnectionConf that can be used to access the 
region
+   * @param opConf use this to specify preferred partitioner
+   *        and its parameters. The implementation will use it if it's 
applicable
+   */
+  def geodeRegion[K: ClassTag, V: ClassTag] (
+    regionPath: String, connConf: GeodeConnectionConf = 
GeodeConnectionConf(sc.getConf),
+    opConf: Map[String, String] = Map.empty): GeodeRegionRDD[K, V] =
+    GeodeRegionRDD[K, V](sc, regionPath, connConf, opConf)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/internal/DefaultGeodeConnection.scala
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/internal/DefaultGeodeConnection.scala
 
b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/internal/DefaultGeodeConnection.scala
new file mode 100644
index 0000000..b232712
--- /dev/null
+++ 
b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/internal/DefaultGeodeConnection.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal
+
+import java.net.InetAddress
+
+import org.apache.geode.cache.client.{ClientCache, ClientCacheFactory, 
ClientRegionShortcut}
+import org.apache.geode.cache.execute.{FunctionException, FunctionService}
+import org.apache.geode.cache.query.Query
+import org.apache.geode.cache.{Region, RegionService}
+import org.apache.geode.internal.cache.execute.InternalExecution
+import io.pivotal.geode.spark.connector.internal.oql.QueryResultCollector
+import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartition
+import org.apache.spark.{SparkEnv, Logging}
+import io.pivotal.geode.spark.connector.GeodeConnection
+import io.pivotal.geode.spark.connector.internal.geodefunctions._
+import java.util.{Set => JSet, List => JList }
+
+/**
+ * Default GeodeConnection implementation. The instance of this should be
+ * created by DefaultGeodeConnectionFactory
+ * @param locators pairs of host/port of locators
+ * @param gemFireProps The initial geode properties to be used.
+ */
+private[connector] class DefaultGeodeConnection (
+  locators: Seq[(String, Int)], gemFireProps: Map[String, String] = Map.empty) 
+  extends GeodeConnection with Logging {
+
+  private val clientCache = initClientCache()
+
+  /** Register Geode functions to the Geode cluster */
+  
FunctionService.registerFunction(RetrieveRegionMetadataFunction.getInstance())
+  FunctionService.registerFunction(RetrieveRegionFunction.getInstance())
+
+  private def initClientCache() : ClientCache = {
+    try {
+      val ccf = getClientCacheFactory
+      ccf.create()
+    } catch {
+      case e: Exception =>
+        logError(s"""Failed to init ClientCache, 
locators=${locators.mkString(",")}, Error: $e""")
+        throw new RuntimeException(e)
+    }
+  }
+  
+  private def getClientCacheFactory: ClientCacheFactory = {
+    import io.pivotal.geode.spark.connector.map2Properties
+    val ccf = new ClientCacheFactory(gemFireProps)
+    ccf.setPoolReadTimeout(30000)
+    val servers = LocatorHelper.getAllGeodeServers(locators)
+    if (servers.isDefined && servers.get.size > 0) {
+      val sparkIp = System.getenv("SPARK_LOCAL_IP")
+      val hostName = if (sparkIp != null) 
InetAddress.getByName(sparkIp).getCanonicalHostName
+                     else InetAddress.getLocalHost.getCanonicalHostName
+      val executorId = SparkEnv.get.executorId      
+      val pickedServers = LocatorHelper.pickPreferredGeodeServers(servers.get, 
hostName, executorId)
+      logInfo(s"""Init ClientCache: severs=${pickedServers.mkString(",")}, 
host=$hostName executor=$executorId props=$gemFireProps""")
+      logDebug(s"""Init ClientCache: 
all-severs=${pickedServers.mkString(",")}""")
+      pickedServers.foreach{ case (host, port)  => ccf.addPoolServer(host, 
port) }
+    } else {
+      logInfo(s"""Init ClientCache: locators=${locators.mkString(",")}, 
props=$gemFireProps""")
+      locators.foreach { case (host, port)  => ccf.addPoolLocator(host, port) }
+    }
+    ccf
+  }
+
+  /** close the clientCache */
+  override def close(): Unit =
+    if (! clientCache.isClosed) clientCache.close()
+
+  /** ----------------------------------------- */
+  /** implementation of GeodeConnection trait */
+  /** ----------------------------------------- */
+
+  override def getQuery(queryString: String): Query =
+    
clientCache.asInstanceOf[RegionService].getQueryService.newQuery(queryString)
+
+  override def validateRegion[K, V](regionPath: String): Unit = {
+    val md = getRegionMetadata[K, V](regionPath)
+    if (! md.isDefined) throw new RuntimeException(s"The region named 
$regionPath was not found")
+  }
+
+  def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata] = {
+    import scala.collection.JavaConversions.setAsJavaSet
+    val region = getRegionProxy[K, V](regionPath)
+    val set0: JSet[Integer] = Set[Integer](0)
+    val exec = 
FunctionService.onRegion(region).asInstanceOf[InternalExecution].withBucketFilter(set0)
+    exec.setWaitOnExceptionFlag(true)
+    try {
+      val collector = exec.execute(RetrieveRegionMetadataFunction.ID)
+      val r = collector.getResult.asInstanceOf[JList[RegionMetadata]]
+      logDebug(r.get(0).toString)
+      Some(r.get(0))
+    } catch {
+      case e: FunctionException => 
+        if (e.getMessage.contains(s"The region named /$regionPath was not 
found")) None
+        else throw e
+    }
+  }
+
+  def getRegionProxy[K, V](regionPath: String): Region[K, V] = {
+    val region1: Region[K, V] = 
clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]]
+    if (region1 != null) region1
+    else DefaultGeodeConnection.regionLock.synchronized {
+      val region2 = clientCache.getRegion(regionPath).asInstanceOf[Region[K, 
V]]
+      if (region2 != null) region2
+      else clientCache.createClientRegionFactory[K, 
V](ClientRegionShortcut.PROXY).create(regionPath)
+    }
+  }
+
+  override def getRegionData[K, V](regionPath: String, whereClause: 
Option[String], split: GeodeRDDPartition): Iterator[(K, V)] = {
+    val region = getRegionProxy[K, V](regionPath)
+    val desc = s"""RDD($regionPath, "${whereClause.getOrElse("")}", 
${split.index})"""
+    val args : Array[String] = Array[String](whereClause.getOrElse(""), desc)
+    val collector = new StructStreamingResultCollector(desc)
+        // RetrieveRegionResultCollector[(K, V)]
+    import scala.collection.JavaConversions.setAsJavaSet
+    val exec = 
FunctionService.onRegion(region).withArgs(args).withCollector(collector).asInstanceOf[InternalExecution]
+      .withBucketFilter(split.bucketSet.map(Integer.valueOf))
+    exec.setWaitOnExceptionFlag(true)
+    exec.execute(RetrieveRegionFunction.ID)
+    collector.getResult.map{objs: Array[Object] => (objs(0).asInstanceOf[K], 
objs(1).asInstanceOf[V])}
+  }
+
+  override def executeQuery(regionPath: String, bucketSet: Set[Int], 
queryString: String) = {
+    import scala.collection.JavaConversions.setAsJavaSet
+    FunctionService.registerFunction(QueryFunction.getInstance())
+    val collector = new QueryResultCollector
+    val region = getRegionProxy(regionPath)
+    val args: Array[String] = Array[String](queryString, bucketSet.toString)
+    val exec = 
FunctionService.onRegion(region).withCollector(collector).asInstanceOf[InternalExecution]
+      .withBucketFilter(bucketSet.map(Integer.valueOf))
+      .withArgs(args)
+    exec.execute(QueryFunction.ID)
+    collector.getResult
+  }
+}
+
+private[connector] object DefaultGeodeConnection {
+  /** a lock object only used by getRegionProxy...() */
+  private val regionLock = new Object
+}
+
+/** The purpose of this class is making unit test 
DefaultGeodeConnectionManager easier */
+class DefaultGeodeConnectionFactory {
+
+  def newConnection(locators: Seq[(String, Int)], gemFireProps: Map[String, 
String] = Map.empty) =
+    new DefaultGeodeConnection(locators, gemFireProps)
+
+}

Reply via email to