http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala deleted file mode 100644 index f971a3e..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeJoinRDD.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.rdd - -import org.apache.geode.cache.Region -import io.pivotal.geode.spark.connector.GeodeConnectionConf -import org.apache.spark.{TaskContext, Partition} -import org.apache.spark.rdd.RDD -import scala.collection.JavaConversions._ - -/** - * An `RDD[T, V]` that will represent the result of a join between `left` RDD[T] - * and the specified Geode Region[K, V]. - */ -class GeodeJoinRDD[T, K, V] private[connector] - ( left: RDD[T], - func: T => K, - val regionPath: String, - val connConf: GeodeConnectionConf - ) extends RDD[(T, V)](left.context, left.dependencies) { - - /** validate region existence when GeodeRDD object is created */ - validate() - - /** Validate region, and make sure it exists. */ - private def validate(): Unit = connConf.getConnection.validateRegion[K, V](regionPath) - - override protected def getPartitions: Array[Partition] = left.partitions - - override def compute(split: Partition, context: TaskContext): Iterator[(T, V)] = { - val region = connConf.getConnection.getRegionProxy[K, V](regionPath) - if (func == null) computeWithoutFunc(split, context, region) - else computeWithFunc(split, context, region) - } - - /** T is (K, V1) since there's no map function `func` */ - private def computeWithoutFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, V)] = { - val leftPairs = left.iterator(split, context).toList.asInstanceOf[List[(K, _)]] - val leftKeys = leftPairs.map { case (k, v) => k}.toSet - // Note: get all will return (key, null) for non-exist entry, so remove those entries - val rightPairs = region.getAll(leftKeys).filter { case (k, v) => v != null} - leftPairs.filter{case (k, v) => rightPairs.contains(k)} - .map {case (k, v) => ((k, v).asInstanceOf[T], rightPairs.get(k).get)}.toIterator - } - - private def computeWithFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, V)] = { - val leftPairs = left.iterator(split, context).toList.map(t => (t, func(t))) - val leftKeys = leftPairs.map { case (t, k) => k}.toSet - // Note: get all will return (key, null) for non-exist entry, so remove those entries - val rightPairs = region.getAll(leftKeys).filter { case (k, v) => v != null} - leftPairs.filter { case (t, k) => rightPairs.contains(k)}.map {case (t, k) => (t, rightPairs.get(k).get)}.toIterator - } -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala deleted file mode 100644 index 04855c1..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeOuterJoinRDD.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.rdd - -import org.apache.geode.cache.Region -import io.pivotal.geode.spark.connector.GeodeConnectionConf -import org.apache.spark.{TaskContext, Partition} -import org.apache.spark.rdd.RDD -import scala.collection.JavaConversions._ - -/** - * An `RDD[ T, Option[V] ]` that represents the result of a left outer join - * between `left` RDD[T] and the specified Geode Region[K, V]. - */ -class GeodeOuterJoinRDD[T, K, V] private[connector] - ( left: RDD[T], - func: T => K, - val regionPath: String, - val connConf: GeodeConnectionConf - ) extends RDD[(T, Option[V])](left.context, left.dependencies) { - - /** validate region existence when GeodeRDD object is created */ - validate() - - /** Validate region, and make sure it exists. */ - private def validate(): Unit = connConf.getConnection.validateRegion[K, V](regionPath) - - override protected def getPartitions: Array[Partition] = left.partitions - - override def compute(split: Partition, context: TaskContext): Iterator[(T, Option[V])] = { - val region = connConf.getConnection.getRegionProxy[K, V](regionPath) - if (func == null) computeWithoutFunc(split, context, region) - else computeWithFunc(split, context, region) - } - - /** T is (K1, V1), and K1 and K are the same type since there's no map function `func` */ - private def computeWithoutFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, Option[V])] = { - val leftPairs = left.iterator(split, context).toList.asInstanceOf[List[(K, _)]] - val leftKeys = leftPairs.map { case (k, v) => k}.toSet - // Note: get all will return (key, null) for non-exist entry - val rightPairs = region.getAll(leftKeys) - // rightPairs is a java.util.Map, not scala map, so need to convert map.get() to Option - leftPairs.map{ case (k, v) => ((k, v).asInstanceOf[T], Option(rightPairs.get(k))) }.toIterator - } - - private def computeWithFunc(split: Partition, context: TaskContext, region: Region[K, V]): Iterator[(T, Option[V])] = { - val leftPairs = left.iterator(split, context).toList.map(t => (t, func(t))) - val leftKeys = leftPairs.map { case (t, k) => k}.toSet - // Note: get all will return (key, null) for non-exist entry - val rightPairs = region.getAll(leftKeys) - // rightPairs is a java.util.Map, not scala map, so need to convert map.get() to Option - leftPairs.map{ case (t, k) => (t, Option(rightPairs.get(k)))}.toIterator - } -} - http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartition.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartition.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartition.scala deleted file mode 100644 index 24fe72e..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartition.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.rdd - -import org.apache.spark.Partition - -/** - * This serializable class represents a GeodeRDD partition. Each partition is mapped - * to one or more buckets of region. The GeodeRDD can materialize the data of the - * partition based on all information contained here. - * @param partitionId partition id, a 0 based number. - * @param bucketSet region bucket id set for this partition. Set.empty means whole - * region (used for replicated region) - * @param locations preferred location for this partition - */ -case class GeodeRDDPartition ( - partitionId: Int, bucketSet: Set[Int], locations: Seq[String] = Nil) - extends Partition { - - override def index: Int = partitionId - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitioner.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitioner.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitioner.scala deleted file mode 100644 index d960cab..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitioner.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.rdd - -import io.pivotal.geode.spark.connector.GeodeConnection -import io.pivotal.geode.spark.connector.internal.RegionMetadata -import org.apache.spark.{Logging, Partition} - -import scala.reflect.ClassTag - -/** - * A GeodeRDD partitioner is used to partition the region into multiple RDD partitions. - */ -trait GeodeRDDPartitioner extends Serializable { - - def name: String - - /** the function that generates partitions */ - def partitions[K: ClassTag, V: ClassTag] - (conn: GeodeConnection, md: RegionMetadata, env: Map[String, String]): Array[Partition] -} - -object GeodeRDDPartitioner extends Logging { - - /** To add new partitioner, just add it to the following list */ - final val partitioners: Map[String, GeodeRDDPartitioner] = - List(OnePartitionPartitioner, ServerSplitsPartitioner).map(e => (e.name, e)).toMap - - /** - * Get a partitioner based on given name, a default partitioner will be returned if there's - * no partitioner for the given name. - */ - def apply(name: String = defaultPartitionedRegionPartitioner.name): GeodeRDDPartitioner = { - val p = partitioners.get(name) - if (p.isDefined) p.get else { - logWarning(s"Invalid preferred partitioner name $name.") - defaultPartitionedRegionPartitioner - } - } - - val defaultReplicatedRegionPartitioner = OnePartitionPartitioner - - val defaultPartitionedRegionPartitioner = ServerSplitsPartitioner - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitionerImpl.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitionerImpl.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitionerImpl.scala deleted file mode 100644 index 4606114..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDPartitionerImpl.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.rdd - -import io.pivotal.geode.spark.connector.GeodeConnection -import io.pivotal.geode.spark.connector.internal.RegionMetadata -import io.pivotal.geode.spark.connector.NumberPartitionsPerServerPropKey -import org.apache.spark.Partition -import scala.collection.JavaConversions._ -import scala.collection.immutable.SortedSet -import scala.collection.mutable -import scala.reflect.ClassTag - -/** This partitioner maps whole region to one GeodeRDDPartition */ -object OnePartitionPartitioner extends GeodeRDDPartitioner { - - override val name = "OnePartition" - - override def partitions[K: ClassTag, V: ClassTag] - (conn: GeodeConnection, md: RegionMetadata, env: Map[String, String]): Array[Partition] = - Array[Partition](new GeodeRDDPartition(0, Set.empty)) -} - -/** - * This partitioner maps whole region to N * M Geode RDD partitions, where M is the number of - * Geode servers that contain the data for the given region. Th default value of N is 1. - */ -object ServerSplitsPartitioner extends GeodeRDDPartitioner { - - override val name = "ServerSplits" - - override def partitions[K: ClassTag, V: ClassTag] - (conn: GeodeConnection, md: RegionMetadata, env: Map[String, String]): Array[Partition] = { - if (md == null) throw new RuntimeException("RegionMetadata is null") - val n = try { env.getOrElse(NumberPartitionsPerServerPropKey, "2").toInt } catch { case e: NumberFormatException => 2 } - if (!md.isPartitioned || md.getServerBucketMap == null || md.getServerBucketMap.isEmpty) - Array[Partition](new GeodeRDDPartition(0, Set.empty)) - else { - val map = mapAsScalaMap(md.getServerBucketMap) - .map { case (srv, set) => (srv, asScalaSet(set).map(_.toInt)) }.toList - .map { case (srv, set) => (srv.getHostName, set) } - doPartitions(map, md.getTotalBuckets, n) - } - } - - /** Converts server to bucket ID set list to array of RDD partitions */ - def doPartitions(serverBucketMap: List[(String, mutable.Set[Int])], totalBuckets: Int, n: Int) - : Array[Partition] = { - - // method that calculates the group size for splitting "k" items into "g" groups - def groupSize(k: Int, g: Int): Int = scala.math.ceil(k / g.toDouble).toInt - - // 1. convert list of server and bucket set pairs to a list of server and sorted bucket set pairs - val srvToSortedBucketSet = serverBucketMap.map { case (srv, set) => (srv, SortedSet[Int]() ++ set) } - - // 2. split bucket set of each server into n splits if possible, and server to Seq(server) - val srvToSplitedBuckeSet = srvToSortedBucketSet.flatMap { case (host, set) => - if (set.isEmpty) Nil else set.grouped(groupSize(set.size, n)).toList.map(s => (Seq(host), s)) } - - // 3. calculate empty bucket IDs by removing all bucket sets of all servers from the full bucket sets - val emptyIDs = SortedSet[Int]() ++ ((0 until totalBuckets).toSet /: srvToSortedBucketSet) {case (s1, (k, s2)) => s1 &~ s2} - - // 4. distribute empty bucket IDs to all partitions evenly. - // The empty buckets do not contain data when partitions are created, but they may contain data - // when RDD is materialized, so need to include those bucket IDs in the partitions. - val srvToFinalBucketSet = if (emptyIDs.isEmpty) srvToSplitedBuckeSet - else srvToSplitedBuckeSet.zipAll( - emptyIDs.grouped(groupSize(emptyIDs.size, srvToSplitedBuckeSet.size)).toList, (Nil, Set.empty), Set.empty).map - { case ((server, set1), set2) => (server, SortedSet[Int]() ++ set1 ++ set2) } - - // 5. create array of partitions w/ 0-based index - (0 until srvToFinalBucketSet.size).toList.zip(srvToFinalBucketSet).map - { case (i, (srv, set)) => new GeodeRDDPartition(i, set, srv) }.toArray - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDWriter.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDWriter.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDWriter.scala deleted file mode 100644 index 27559f3..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRDDWriter.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.rdd - -import org.apache.geode.cache.Region -import io.pivotal.geode.spark.connector._ -import org.apache.spark.{Logging, TaskContext} - -import scala.collection.Iterator -import java.util.{HashMap => JMap} - -/** This trait provide some common code for pair and non-pair RDD writer */ -private[rdd] abstract class GeodeRDDWriterBase (opConf: Map[String, String]) extends Serializable { - - val batchSize = try { opConf.getOrElse(RDDSaveBatchSizePropKey, RDDSaveBatchSizeDefault.toString).toInt} - catch { case e: NumberFormatException => RDDSaveBatchSizeDefault } - - def mapDump(map: Map[_, _], num: Int): String = { - val firstNum = map.take(num + 1) - if (firstNum.size > num) s"$firstNum ..." else s"$firstNum" - } -} - -/** - * Writer object that provides write function that saves non-pair RDD partitions to Geode. - * Those functions will be executed on Spark executors. - * @param regionPath the full path of the region where the data is written to - */ -class GeodeRDDWriter[T, K, V] - (regionPath: String, connConf: GeodeConnectionConf, opConf: Map[String, String] = Map.empty) - extends GeodeRDDWriterBase(opConf) with Serializable with Logging { - - def write(func: T => (K, V))(taskContext: TaskContext, data: Iterator[T]): Unit = { - val region: Region[K, V] = connConf.getConnection.getRegionProxy[K, V](regionPath) - var count = 0 - val chunks = data.grouped(batchSize) - chunks.foreach { chunk => - val map = chunk.foldLeft(new JMap[K, V]()){case (m, t) => val (k, v) = func(t); m.put(k, v); m} - region.putAll(map) - count += chunk.length - } - logDebug(s"$count entries (batch.size = $batchSize) are saved to region $regionPath") - } -} - - -/** - * Writer object that provides write function that saves pair RDD partitions to Geode. - * Those functions will be executed on Spark executors. - * @param regionPath the full path of the region where the data is written to - */ -class GeodePairRDDWriter[K, V] - (regionPath: String, connConf: GeodeConnectionConf, opConf: Map[String, String] = Map.empty) - extends GeodeRDDWriterBase(opConf) with Serializable with Logging { - - def write(taskContext: TaskContext, data: Iterator[(K, V)]): Unit = { - val region: Region[K, V] = connConf.getConnection.getRegionProxy[K, V](regionPath) - var count = 0 - val chunks = data.grouped(batchSize) - chunks.foreach { chunk => - val map = chunk.foldLeft(new JMap[K, V]()){case (m, (k,v)) => m.put(k,v); m} - region.putAll(map) - count += chunk.length - } - logDebug(s"$count entries (batch.batch = $batchSize) are saved to region $regionPath") - } -} - http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRegionRDD.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRegionRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRegionRDD.scala deleted file mode 100644 index 6980c0f..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/internal/rdd/GeodeRegionRDD.scala +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.rdd - -import scala.collection.Seq -import scala.reflect.ClassTag -import org.apache.spark.rdd.RDD -import org.apache.spark.{TaskContext, Partition, SparkContext} -import io.pivotal.geode.spark.connector.{GeodeConnectionConf, PreferredPartitionerPropKey} -import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartitioner._ - -/** - * This class exposes Geode region as a RDD. - * @param sc the Spark Context - * @param regionPath the full path of the region - * @param connConf the GeodeConnectionConf to access the region - * @param opConf the parameters for this operation, such as preferred partitioner. - */ -class GeodeRegionRDD[K, V] private[connector] - (@transient sc: SparkContext, - val regionPath: String, - val connConf: GeodeConnectionConf, - val opConf: Map[String, String] = Map.empty, - val whereClause: Option[String] = None - ) (implicit ctk: ClassTag[K], ctv: ClassTag[V]) - extends RDD[(K, V)](sc, Seq.empty) { - - /** validate region existence when GeodeRDD object is created */ - validate() - - /** Validate region, and make sure it exists. */ - private def validate(): Unit = connConf.getConnection.validateRegion[K, V](regionPath) - - def kClassTag = ctk - - def vClassTag = ctv - - /** - * method `copy` is used by method `where` that creates new immutable - * GeodeRDD instance based this instance. - */ - private def copy( - regionPath: String = regionPath, - connConf: GeodeConnectionConf = connConf, - opConf: Map[String, String] = opConf, - whereClause: Option[String] = None - ): GeodeRegionRDD[K, V] = { - - require(sc != null, - """RDD transformation requires a non-null SparkContext. Unfortunately - |SparkContext in this GeodeRDD is null. This can happen after - |GeodeRDD has been deserialized. SparkContext is not Serializable, - |therefore it deserializes to null. RDD transformations are not allowed - |inside lambdas used in other RDD transformations.""".stripMargin ) - - new GeodeRegionRDD[K, V](sc, regionPath, connConf, opConf, whereClause) - } - - /** When where clause is specified, OQL query - * `select key, value from /<region-path>.entries where <where clause> ` - * is used to filter the dataset. - */ - def where(whereClause: Option[String]): GeodeRegionRDD[K, V] = { - if (whereClause.isDefined) copy(whereClause = whereClause) - else this - } - - /** this version is for Java API that doesn't use scala.Option */ - def where(whereClause: String): GeodeRegionRDD[K, V] = { - if (whereClause == null || whereClause.trim.isEmpty) this - else copy(whereClause = Option(whereClause.trim)) - } - - /** - * Use preferred partitioner generate partitions. `defaultReplicatedRegionPartitioner` - * will be used if it's a replicated region. - */ - override def getPartitions: Array[Partition] = { - val conn = connConf.getConnection - val md = conn.getRegionMetadata[K, V](regionPath) - md match { - case None => throw new RuntimeException(s"region $regionPath was not found.") - case Some(data) => - logInfo(s"""RDD id=${this.id} region=$regionPath conn=${connConf.locators.mkString(",")}, env=$opConf""") - val p = if (data.isPartitioned) preferredPartitioner else defaultReplicatedRegionPartitioner - val splits = p.partitions[K, V](conn, data, opConf) - logDebug(s"""RDD id=${this.id} region=$regionPath partitions=\n ${splits.mkString("\n ")}""") - splits - } - } - - /** - * provide preferred location(s) (host name(s)) of the given partition. - * Only some partitioner implementation(s) provides this info, which is - * useful when Spark cluster and Geode cluster share some hosts. - */ - override def getPreferredLocations(split: Partition) = - split.asInstanceOf[GeodeRDDPartition].locations - - /** - * Get preferred partitioner. return `defaultPartitionedRegionPartitioner` if none - * preference is specified. - */ - private def preferredPartitioner = - GeodeRDDPartitioner(opConf.getOrElse( - PreferredPartitionerPropKey, GeodeRDDPartitioner.defaultPartitionedRegionPartitioner.name)) - - /** materialize a RDD partition */ - override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = { - val partition = split.asInstanceOf[GeodeRDDPartition] - logDebug(s"compute RDD id=${this.id} partition $partition") - connConf.getConnection.getRegionData[K,V](regionPath, whereClause, partition) - // new InterruptibleIterator(context, split.asInstanceOf[GeodeRDDPartition[K, V]].iterator) - } -} - -object GeodeRegionRDD { - - def apply[K: ClassTag, V: ClassTag](sc: SparkContext, regionPath: String, - connConf: GeodeConnectionConf, opConf: Map[String, String] = Map.empty) - : GeodeRegionRDD[K, V] = - new GeodeRegionRDD[K, V](sc, regionPath, connConf, opConf) - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRegionRDD.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRegionRDD.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRegionRDD.scala deleted file mode 100644 index f859173..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/GeodeJavaRegionRDD.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.javaapi - -import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD -import org.apache.spark.api.java.JavaPairRDD - -class GeodeJavaRegionRDD[K, V](rdd: GeodeRegionRDD[K, V]) extends JavaPairRDD[K, V](rdd)(rdd.kClassTag, rdd.vClassTag) { - - def where(whereClause: String): GeodeJavaRegionRDD[K, V] = new GeodeJavaRegionRDD(rdd.where(whereClause)) - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/JavaAPIHelper.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/JavaAPIHelper.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/JavaAPIHelper.scala deleted file mode 100644 index ffa6195..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/javaapi/JavaAPIHelper.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.javaapi - -import org.apache.spark.api.java.{JavaPairRDD, JavaRDD} -import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaDStream} - -import scala.reflect.ClassTag -import scala.collection.JavaConversions._ - -/** - * A helper class to make it possible to access components written in Scala from Java code. - */ -private[connector] object JavaAPIHelper { - - /** Returns a `ClassTag` of a given runtime class. */ - def getClassTag[T](clazz: Class[T]): ClassTag[T] = ClassTag(clazz) - - /** - * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef]. - * see JavaSparkContext.fakeClassTag in Spark for more info. - */ - def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] - - /** Converts a Java `Properties` to a Scala immutable `Map[String, String]`. */ - def propertiesToScalaMap[K, V](props: java.util.Properties): Map[String, String] = - Map(props.toSeq: _*) - - /** convert a JavaRDD[(K,V)] to JavaPairRDD[K,V] */ - def toJavaPairRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = - JavaPairRDD.fromJavaRDD(rdd) - - /** convert a JavaDStream[(K,V)] to JavaPairDStream[K,V] */ - def toJavaPairDStream[K, V](ds: JavaDStream[(K, V)]): JavaPairDStream[K, V] = - JavaPairDStream.fromJavaDStream(ds) - - /** an empty Map[String, String] for default opConf **/ - val emptyStrStrMap: Map[String, String] = Map.empty -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/package.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/package.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/package.scala deleted file mode 100644 index 6f9a780..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/package.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark - -import io.pivotal.geode.spark.connector.internal.rdd.{ServerSplitsPartitioner, OnePartitionPartitioner} -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SQLContext - -import scala.reflect.ClassTag - -/** - * The root package of Geode connector for Apache Spark. - * Provides handy implicit conversions that add geode-specific - * methods to `SparkContext` and `RDD`. - */ -package object connector { - - /** constants */ - final val GeodeLocatorPropKey = "spark.geode.locators" - // partitioner related keys and values - final val PreferredPartitionerPropKey = "preferred.partitioner" - final val NumberPartitionsPerServerPropKey = "number.partitions.per.server" - final val OnePartitionPartitionerName = OnePartitionPartitioner.name - final val ServerSplitsPartitionerName = ServerSplitsPartitioner.name - - final val RDDSaveBatchSizePropKey = "rdd.save.batch.size" - final val RDDSaveBatchSizeDefault = 10000 - - /** implicits */ - - implicit def toSparkContextFunctions(sc: SparkContext): GeodeSparkContextFunctions = - new GeodeSparkContextFunctions(sc) - - implicit def toSQLContextFunctions(sqlContext: SQLContext): GeodeSQLContextFunctions = - new GeodeSQLContextFunctions(sqlContext) - - implicit def toGeodePairRDDFunctions[K: ClassTag, V: ClassTag] - (self: RDD[(K, V)]): GeodePairRDDFunctions[K, V] = new GeodePairRDDFunctions(self) - - implicit def toGeodeRDDFunctions[T: ClassTag] - (self: RDD[T]): GeodeRDDFunctions[T] = new GeodeRDDFunctions(self) - - /** utility implicits */ - - /** convert Map[String, String] to java.util.Properties */ - implicit def map2Properties(map: Map[String,String]): java.util.Properties = - (new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); props} - - /** internal util methods */ - - private[connector] def getRddPartitionsInfo(rdd: RDD[_], sep: String = "\n "): String = - rdd.partitions.zipWithIndex.map{case (p,i) => s"$i: $p loc=${rdd.preferredLocations(p)}"}.mkString(sep) - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/GeodeDStreamFunctions.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/GeodeDStreamFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/GeodeDStreamFunctions.scala deleted file mode 100644 index 4d46429..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/GeodeDStreamFunctions.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.streaming - -import io.pivotal.geode.spark.connector.GeodeConnectionConf -import io.pivotal.geode.spark.connector.internal.rdd.{GeodePairRDDWriter, GeodeRDDWriter} -import org.apache.spark.Logging -import org.apache.spark.api.java.function.PairFunction -import org.apache.spark.streaming.dstream.DStream - -/** - * Extra geode functions on DStream of non-pair elements through an implicit conversion. - * Import `io.pivotal.geode.spark.connector.streaming._` at the top of your program to - * use these functions. - */ -class GeodeDStreamFunctions[T](val dstream: DStream[T]) extends Serializable with Logging { - - /** - * Save the DStream of non-pair elements to Geode key-value store. - * @param regionPath the full path of region that the DStream is stored - * @param func the function that converts elements of the DStream to key/value pairs - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - * @param opConf the optional parameters for this operation - */ - def saveToGeode[K, V]( - regionPath: String, - func: T => (K, V), - connConf: GeodeConnectionConf = defaultConnectionConf, - opConf: Map[String, String] = Map.empty): Unit = { - connConf.getConnection.validateRegion[K, V](regionPath) - val writer = new GeodeRDDWriter[T, K, V](regionPath, connConf, opConf) - logInfo(s"""Save DStream region=$regionPath conn=${connConf.locators.mkString(",")}""") - dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write(func) _)) - } - - /** this version of saveToGeode is just for Java API */ - def saveToGeode[K, V]( - regionPath: String, - func: PairFunction[T, K, V], - connConf: GeodeConnectionConf, - opConf: Map[String, String] ): Unit = { - saveToGeode[K, V](regionPath, func.call _, connConf, opConf) - } - - private[connector] def defaultConnectionConf: GeodeConnectionConf = - GeodeConnectionConf(dstream.context.sparkContext.getConf) -} - - -/** - * Extra geode functions on DStream of (key, value) pairs through an implicit conversion. - * Import `io.pivotal.geode.spark.connector.streaming._` at the top of your program to - * use these functions. - */ -class GeodePairDStreamFunctions[K, V](val dstream: DStream[(K,V)]) extends Serializable with Logging { - - /** - * Save the DStream of pairs to Geode key-value store without any conversion - * @param regionPath the full path of region that the DStream is stored - * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster - * @param opConf the optional parameters for this operation - */ - def saveToGeode( - regionPath: String, - connConf: GeodeConnectionConf = defaultConnectionConf, - opConf: Map[String, String] = Map.empty): Unit = { - connConf.getConnection.validateRegion[K, V](regionPath) - val writer = new GeodePairRDDWriter[K, V](regionPath, connConf, opConf) - logInfo(s"""Save DStream region=$regionPath conn=${connConf.locators.mkString(",")}""") - dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write _)) - } - - private[connector] def defaultConnectionConf: GeodeConnectionConf = - GeodeConnectionConf(dstream.context.sparkContext.getConf) -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/package.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/package.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/package.scala deleted file mode 100644 index 0d1f1eb..0000000 --- a/geode-spark-connector/geode-spark-connector/src/main/scala/io/pivotal/geode/spark/connector/streaming/package.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector - -import org.apache.spark.streaming.dstream.DStream - -/** - * Provides handy implicit conversions that add gemfire-specific methods to `DStream`. - */ -package object streaming { - - implicit def toGeodeDStreamFunctions[T](ds: DStream[T]): GeodeDStreamFunctions[T] = - new GeodeDStreamFunctions[T](ds) - - implicit def toGeodePairDStreamFunctions[K, V](ds: DStream[(K, V)]): GeodePairDStreamFunctions[K, V] = - new GeodePairDStreamFunctions[K, V](ds) - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnection.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnection.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnection.scala new file mode 100644 index 0000000..6c1df67 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnection.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector + +import org.apache.geode.cache.execute.ResultCollector +import org.apache.geode.cache.query.Query +import org.apache.geode.cache.Region +import io.pivotal.geode.spark.connector.internal.RegionMetadata +import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartition + + +trait GeodeConnection { + + /** + * Validate region existence and key/value type constraints, throw RuntimeException + * if region does not exist or key and/or value type do(es) not match. + * @param regionPath the full path of region + */ + def validateRegion[K, V](regionPath: String): Unit + + /** + * Get Region proxy for the given region + * @param regionPath the full path of region + */ + def getRegionProxy[K, V](regionPath: String): Region[K, V] + + /** + * Retrieve region meta data for the given region. + * @param regionPath: the full path of the region + * @return Some[RegionMetadata] if region exists, None otherwise + */ + def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata] + + /** + * Retrieve region data for the given region and bucket set + * @param regionPath: the full path of the region + * @param whereClause: the set of bucket IDs + * @param split: Geode RDD Partition instance + */ + def getRegionData[K, V](regionPath: String, whereClause: Option[String], split: GeodeRDDPartition): Iterator[(K, V)] + + def executeQuery(regionPath: String, bucketSet: Set[Int], queryString: String): Object + /** + * Create a geode OQL query + * @param queryString Geode OQL query string + */ + def getQuery(queryString: String): Query + + /** Close the connection */ + def close(): Unit +} + + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionConf.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionConf.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionConf.scala new file mode 100644 index 0000000..38d9e07 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionConf.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector + +import org.apache.spark.SparkConf +import io.pivotal.geode.spark.connector.internal.{DefaultGeodeConnectionManager, LocatorHelper} + +/** + * Stores configuration of a connection to Geode cluster. It is serializable and can + * be safely sent over network. + * + * @param locators Geode locator host:port pairs, the default is (localhost,10334) + * @param geodeProps The initial geode properties to be used. + * @param connectionManager GeodeConnectionFactory instance + */ +class GeodeConnectionConf( + val locators: Seq[(String, Int)], + val geodeProps: Map[String, String] = Map.empty, + connectionManager: GeodeConnectionManager = new DefaultGeodeConnectionManager + ) extends Serializable { + + /** require at least 1 pair of (host,port) */ + require(locators.nonEmpty) + + def getConnection: GeodeConnection = connectionManager.getConnection(this) + +} + +object GeodeConnectionConf { + + /** + * create GeodeConnectionConf object based on locator string and optional GeodeConnectionFactory + * @param locatorStr Geode cluster locator string + * @param connectionManager GeodeConnection factory + */ + def apply(locatorStr: String, geodeProps: Map[String, String] = Map.empty) + (implicit connectionManager: GeodeConnectionManager = new DefaultGeodeConnectionManager): GeodeConnectionConf = { + new GeodeConnectionConf(LocatorHelper.parseLocatorsString(locatorStr), geodeProps, connectionManager) + } + + /** + * create GeodeConnectionConf object based on SparkConf. Note that implicit can + * be used to control what GeodeConnectionFactory instance to use if desired + * @param conf a SparkConf instance + */ + def apply(conf: SparkConf): GeodeConnectionConf = { + val locatorStr = conf.getOption(GeodeLocatorPropKey).getOrElse( + throw new RuntimeException(s"SparkConf does not contain property $GeodeLocatorPropKey")) + // SparkConf only holds properties whose key starts with "spark.", In order to + // put geode properties in SparkConf, all geode properties are prefixes with + // "spark.geode.". This prefix was removed before the properties were put in `geodeProp` + val prefix = "spark.geode." + val geodeProps = conf.getAll.filter { + case (k, v) => k.startsWith(prefix) && k != GeodeLocatorPropKey + }.map { case (k, v) => (k.substring(prefix.length), v) }.toMap + apply(locatorStr, geodeProps) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionManager.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionManager.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionManager.scala new file mode 100644 index 0000000..bf678f0 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeConnectionManager.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector + +/** + * GeodeConnectionFactory provide an common interface that manages Geode + * connections, and it's serializable. Each factory instance will handle + * connection instance creation and connection pool management. + */ +trait GeodeConnectionManager extends Serializable { + + /** get connection for the given connector */ + def getConnection(connConf: GeodeConnectionConf): GeodeConnection + + /** close the connection */ + def closeConnection(connConf: GeodeConnectionConf): Unit +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeFunctionDeployer.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeFunctionDeployer.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeFunctionDeployer.scala new file mode 100644 index 0000000..6e93b05 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeFunctionDeployer.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector + +import java.io.File +import java.net.URL +import org.apache.commons.httpclient.methods.PostMethod +import org.apache.commons.httpclient.methods.multipart.{FilePart, Part, MultipartRequestEntity} +import org.apache.commons.httpclient.HttpClient +import org.apache.spark.Logging + +object GeodeFunctionDeployer { + def main(args: Array[String]) { + new GeodeFunctionDeployer(new HttpClient()).commandLineRun(args) + } +} + +class GeodeFunctionDeployer(val httpClient:HttpClient) extends Logging { + + def deploy(host: String, port: Int, jarLocation: String): String = + deploy(host + ":" + port, jarLocation) + + def deploy(host: String, port: Int, jar:File): String = + deploy(host + ":" + port, jar) + + def deploy(jmxHostAndPort: String, jarLocation: String): String = + deploy(jmxHostAndPort, jarFileHandle(jarLocation)) + + def deploy(jmxHostAndPort: String, jar: File): String = { + val urlString = constructURLString(jmxHostAndPort) + val filePost: PostMethod = new PostMethod(urlString) + val parts: Array[Part] = new Array[Part](1) + parts(0) = new FilePart("resources", jar) + filePost.setRequestEntity(new MultipartRequestEntity(parts, filePost.getParams)) + val status: Int = httpClient.executeMethod(filePost) + "Deployed Jar with status:" + status + } + + private[connector] def constructURLString(jmxHostAndPort: String) = + "http://" + jmxHostAndPort + "/gemfire/v1/deployed" + + private[connector]def jarFileHandle(jarLocation: String) = { + val f: File = new File(jarLocation) + if (!f.exists()) { + val errorMessage: String = "Invalid jar file:" + f.getAbsolutePath + logInfo(errorMessage) + throw new RuntimeException(errorMessage) + } + f + } + + def commandLineRun(args: Array[String]):Unit = { + val (hostPort: String, jarFile: String) = + if (args.length < 2) { + logInfo("JMX Manager Host and Port (example: localhost:7070):") + val bufferedReader = new java.io.BufferedReader(new java.io.InputStreamReader(System.in)) + val jmxHostAndPort = bufferedReader.readLine() + logInfo("Location of geode-functions.jar:") + val functionJarLocation = bufferedReader.readLine() + (jmxHostAndPort, functionJarLocation) + } else { + (args(0), args(1)) + } + val status = deploy(hostPort, jarFile) + logInfo(status) + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeKryoRegistrator.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeKryoRegistrator.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeKryoRegistrator.scala new file mode 100644 index 0000000..0bf7df5 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeKryoRegistrator.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector + +import com.esotericsoftware.kryo.Kryo +import io.pivotal.geode.spark.connector.internal.oql.UndefinedSerializer +import org.apache.spark.serializer.KryoRegistrator +import org.apache.geode.cache.query.internal.Undefined + +class GeodeKryoRegistrator extends KryoRegistrator{ + + override def registerClasses(kyro: Kryo): Unit = { + kyro.addDefaultSerializer(classOf[Undefined], classOf[UndefinedSerializer]) + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodePairRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodePairRDDFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodePairRDDFunctions.scala new file mode 100644 index 0000000..ba5d2df --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodePairRDDFunctions.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector + +import io.pivotal.geode.spark.connector.internal.rdd.{GeodeOuterJoinRDD, GeodeJoinRDD, GeodePairRDDWriter} +import org.apache.spark.Logging +import org.apache.spark.api.java.function.Function +import org.apache.spark.rdd.RDD + +/** + * Extra gemFire functions on RDDs of (key, value) pairs through an implicit conversion. + * Import `io.pivotal.geode.spark.connector._` at the top of your program to + * use these functions. + */ +class GeodePairRDDFunctions[K, V](val rdd: RDD[(K, V)]) extends Serializable with Logging { + + /** + * Save the RDD of pairs to Geode key-value store without any conversion + * @param regionPath the full path of region that the RDD is stored + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param opConf the optional parameters for this operation + */ + def saveToGeode( + regionPath: String, + connConf: GeodeConnectionConf = defaultConnectionConf, + opConf: Map[String, String] = Map.empty): Unit = { + connConf.getConnection.validateRegion[K, V](regionPath) + if (log.isDebugEnabled) + logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n ${getRddPartitionsInfo(rdd)}""") + else + logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""") + val writer = new GeodePairRDDWriter[K, V](regionPath, connConf, opConf) + rdd.sparkContext.runJob(rdd, writer.write _) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` + * RDD and the Geode `Region[K, V2]`. Each pair of elements will be returned + * as a ((k, v), v2) tuple, where (k, v) is in `this` RDD and (k, v2) is in the + * Geode region. + * + *@param regionPath the region path of the Geode region + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @tparam K2 the key type of the Geode region + * @tparam V2 the value type of the Geode region + * @return RDD[T, V] + */ + def joinGeodeRegion[K2 <: K, V2]( + regionPath: String, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeJoinRDD[(K, V), K, V2] = { + new GeodeJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` RDD + * and the Geode `Region[K2, V2]`. The join key from RDD element is generated by + * `func(K, V) => K2`, and the key from the Geode region is jus the key of the + * key/value pair. + * + * Each pair of elements of result RDD will be returned as a ((k, v), v2) tuple, + * where (k, v) is in `this` RDD and (k2, v2) is in the Geode region. + * + * @param regionPath the region path of the Geode region + * @param func the function that generates region key from RDD element (K, V) + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @tparam K2 the key type of the Geode region + * @tparam V2 the value type of the Geode region + * @return RDD[(K, V), V2] + */ + def joinGeodeRegion[K2, V2]( + regionPath: String, func: ((K, V)) => K2, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeJoinRDD[(K, V), K2, V2] = + new GeodeJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf) + + /** This version of joinGeodeRegion(...) is just for Java API. */ + private[connector] def joinGeodeRegion[K2, V2]( + regionPath: String, func: Function[(K, V), K2], connConf: GeodeConnectionConf): GeodeJoinRDD[(K, V), K2, V2] = { + new GeodeJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf) + } + + /** + * Perform a left outer join of `this` RDD and the Geode `Region[K, V2]`. + * For each element (k, v) in `this` RDD, the resulting RDD will either contain + * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair + * ((k, v), None)) if no element in the Geode region have key k. + * + * @param regionPath the region path of the Geode region + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @tparam K2 the key type of the Geode region + * @tparam V2 the value type of the Geode region + * @return RDD[ (K, V), Option[V] ] + */ + def outerJoinGeodeRegion[K2 <: K, V2]( + regionPath: String, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeOuterJoinRDD[(K, V), K, V2] = { + new GeodeOuterJoinRDD[(K, V), K, V2](rdd, null, regionPath, connConf) + } + + /** + * Perform a left outer join of `this` RDD and the Geode `Region[K2, V2]`. + * The join key from RDD element is generated by `func(K, V) => K2`, and the + * key from region is jus the key of the key/value pair. + * + * For each element (k, v) in `this` RDD, the resulting RDD will either contain + * all pairs ((k, v), Some(v2)) for v2 in the Geode region, or the pair + * ((k, v), None)) if no element in the Geode region have key `func(k, v)`. + * + *@param regionPath the region path of the Geode region + * @param func the function that generates region key from RDD element (K, V) + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @tparam K2 the key type of the Geode region + * @tparam V2 the value type of the Geode region + * @return RDD[ (K, V), Option[V] ] + */ + def outerJoinGeodeRegion[K2, V2]( + regionPath: String, func: ((K, V)) => K2, connConf: GeodeConnectionConf = defaultConnectionConf): GeodeOuterJoinRDD[(K, V), K2, V2] = { + new GeodeOuterJoinRDD[(K, V), K2, V2](rdd, func, regionPath, connConf) + } + + /** This version of outerJoinGeodeRegion(...) is just for Java API. */ + private[connector] def outerJoinGeodeRegion[K2, V2]( + regionPath: String, func: Function[(K, V), K2], connConf: GeodeConnectionConf): GeodeOuterJoinRDD[(K, V), K2, V2] = { + new GeodeOuterJoinRDD[(K, V), K2, V2](rdd, func.call, regionPath, connConf) + } + + private[connector] def defaultConnectionConf: GeodeConnectionConf = + GeodeConnectionConf(rdd.sparkContext.getConf) + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeRDDFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeRDDFunctions.scala new file mode 100644 index 0000000..2e5c92a --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeRDDFunctions.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector + +import io.pivotal.geode.spark.connector.internal.rdd.{GeodeOuterJoinRDD, GeodeJoinRDD, GeodeRDDWriter} +import org.apache.spark.Logging +import org.apache.spark.api.java.function.{PairFunction, Function} +import org.apache.spark.rdd.RDD + +/** + * Extra gemFire functions on non-Pair RDDs through an implicit conversion. + * Import `io.pivotal.geode.spark.connector._` at the top of your program to + * use these functions. + */ +class GeodeRDDFunctions[T](val rdd: RDD[T]) extends Serializable with Logging { + + /** + * Save the non-pair RDD to Geode key-value store. + * @param regionPath the full path of region that the RDD is stored + * @param func the function that converts elements of RDD to key/value pairs + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @param opConf the optional parameters for this operation + */ + def saveToGeode[K, V]( + regionPath: String, + func: T => (K, V), + connConf: GeodeConnectionConf = defaultConnectionConf, + opConf: Map[String, String] = Map.empty): Unit = { + connConf.getConnection.validateRegion[K, V](regionPath) + if (log.isDebugEnabled) + logDebug(s"""Save RDD id=${rdd.id} to region $regionPath, partitions:\n ${getRddPartitionsInfo(rdd)}""") + else + logInfo(s"""Save RDD id=${rdd.id} to region $regionPath""") + val writer = new GeodeRDDWriter[T, K, V](regionPath, connConf, opConf) + rdd.sparkContext.runJob(rdd, writer.write(func) _) + } + + /** This version of saveToGeode(...) is just for Java API. */ + private[connector] def saveToGeode[K, V]( + regionPath: String, + func: PairFunction[T, K, V], + connConf: GeodeConnectionConf, + opConf: Map[String, String]): Unit = { + saveToGeode[K, V](regionPath, func.call _, connConf, opConf) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` RDD + * and the Geode `Region[K, V]`. The join key from RDD element is generated by + * `func(T) => K`, and the key from the Geode region is just the key of the + * key/value pair. + * + * Each pair of elements of result RDD will be returned as a (t, v) tuple, + * where (t) is in `this` RDD and (k, v) is in the Geode region. + * + * @param regionPath the region path of the Geode region + * @param func the function that generate region key from RDD element T + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @tparam K the key type of the Geode region + * @tparam V the value type of the Geode region + * @return RDD[T, V] + */ + def joinGeodeRegion[K, V](regionPath: String, func: T => K, + connConf: GeodeConnectionConf = defaultConnectionConf): GeodeJoinRDD[T, K, V] = { + new GeodeJoinRDD[T, K, V](rdd, func, regionPath, connConf) + } + + /** This version of joinGeodeRegion(...) is just for Java API. */ + private[connector] def joinGeodeRegion[K, V]( + regionPath: String, func: Function[T, K], connConf: GeodeConnectionConf): GeodeJoinRDD[T, K, V] = { + joinGeodeRegion(regionPath, func.call _, connConf) + } + + /** + * Perform a left outer join of `this` RDD and the Geode `Region[K, V]`. + * The join key from RDD element is generated by `func(T) => K`, and the + * key from region is just the key of the key/value pair. + * + * For each element (t) in `this` RDD, the resulting RDD will either contain + * all pairs (t, Some(v)) for v in the Geode region, or the pair + * (t, None) if no element in the Geode region have key `func(t)` + * + * @param regionPath the region path of the Geode region + * @param func the function that generate region key from RDD element T + * @param connConf the GeodeConnectionConf object that provides connection to Geode cluster + * @tparam K the key type of the Geode region + * @tparam V the value type of the Geode region + * @return RDD[ T, Option[V] ] + */ + def outerJoinGeodeRegion[K, V](regionPath: String, func: T => K, + connConf: GeodeConnectionConf = defaultConnectionConf): GeodeOuterJoinRDD[T, K, V] = { + new GeodeOuterJoinRDD[T, K, V](rdd, func, regionPath, connConf) + } + + /** This version of outerJoinGeodeRegion(...) is just for Java API. */ + private[connector] def outerJoinGeodeRegion[K, V]( + regionPath: String, func: Function[T, K], connConf: GeodeConnectionConf): GeodeOuterJoinRDD[T, K, V] = { + outerJoinGeodeRegion(regionPath, func.call _, connConf) + } + + private[connector] def defaultConnectionConf: GeodeConnectionConf = + GeodeConnectionConf(rdd.sparkContext.getConf) + +} + + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeSQLContextFunctions.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeSQLContextFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeSQLContextFunctions.scala new file mode 100644 index 0000000..83aab7a --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeSQLContextFunctions.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector + +import io.pivotal.geode.spark.connector.internal.oql.{OQLRelation, QueryRDD} +import org.apache.spark.Logging +import org.apache.spark.sql.{DataFrame, SQLContext} + +/** + * Provide Geode OQL specific functions + */ +class GeodeSQLContextFunctions(@transient sqlContext: SQLContext) extends Serializable with Logging { + + /** + * Expose a Geode OQL query result as a DataFrame + * @param query the OQL query string. + */ + def geodeOQL( + query: String, + connConf: GeodeConnectionConf = GeodeConnectionConf(sqlContext.sparkContext.getConf)): DataFrame = { + logInfo(s"OQL query = $query") + val rdd = new QueryRDD[Object](sqlContext.sparkContext, query, connConf) + sqlContext.baseRelationToDataFrame(OQLRelation(rdd)(sqlContext)) + } + + private[connector] def defaultConnectionConf: GeodeConnectionConf = + GeodeConnectionConf(sqlContext.sparkContext.getConf) +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeSparkContextFunctions.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeSparkContextFunctions.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeSparkContextFunctions.scala new file mode 100644 index 0000000..617cb33 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/GeodeSparkContextFunctions.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector + +import io.pivotal.geode.spark.connector.internal.rdd.GeodeRegionRDD +import org.apache.spark.SparkContext + +import scala.reflect.ClassTag + +/** Provides Geode specific methods on `SparkContext` */ +class GeodeSparkContextFunctions(@transient sc: SparkContext) extends Serializable { + + /** + * Expose a Geode region as a GeodeRDD + * @param regionPath the full path of the region + * @param connConf the GeodeConnectionConf that can be used to access the region + * @param opConf use this to specify preferred partitioner + * and its parameters. The implementation will use it if it's applicable + */ + def geodeRegion[K: ClassTag, V: ClassTag] ( + regionPath: String, connConf: GeodeConnectionConf = GeodeConnectionConf(sc.getConf), + opConf: Map[String, String] = Map.empty): GeodeRegionRDD[K, V] = + GeodeRegionRDD[K, V](sc, regionPath, connConf, opConf) + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/106cc60f/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/internal/DefaultGeodeConnection.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/internal/DefaultGeodeConnection.scala b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/internal/DefaultGeodeConnection.scala new file mode 100644 index 0000000..b232712 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/main/scala/org/apache/geode/spark/connector/internal/DefaultGeodeConnection.scala @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal + +import java.net.InetAddress + +import org.apache.geode.cache.client.{ClientCache, ClientCacheFactory, ClientRegionShortcut} +import org.apache.geode.cache.execute.{FunctionException, FunctionService} +import org.apache.geode.cache.query.Query +import org.apache.geode.cache.{Region, RegionService} +import org.apache.geode.internal.cache.execute.InternalExecution +import io.pivotal.geode.spark.connector.internal.oql.QueryResultCollector +import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartition +import org.apache.spark.{SparkEnv, Logging} +import io.pivotal.geode.spark.connector.GeodeConnection +import io.pivotal.geode.spark.connector.internal.geodefunctions._ +import java.util.{Set => JSet, List => JList } + +/** + * Default GeodeConnection implementation. The instance of this should be + * created by DefaultGeodeConnectionFactory + * @param locators pairs of host/port of locators + * @param gemFireProps The initial geode properties to be used. + */ +private[connector] class DefaultGeodeConnection ( + locators: Seq[(String, Int)], gemFireProps: Map[String, String] = Map.empty) + extends GeodeConnection with Logging { + + private val clientCache = initClientCache() + + /** Register Geode functions to the Geode cluster */ + FunctionService.registerFunction(RetrieveRegionMetadataFunction.getInstance()) + FunctionService.registerFunction(RetrieveRegionFunction.getInstance()) + + private def initClientCache() : ClientCache = { + try { + val ccf = getClientCacheFactory + ccf.create() + } catch { + case e: Exception => + logError(s"""Failed to init ClientCache, locators=${locators.mkString(",")}, Error: $e""") + throw new RuntimeException(e) + } + } + + private def getClientCacheFactory: ClientCacheFactory = { + import io.pivotal.geode.spark.connector.map2Properties + val ccf = new ClientCacheFactory(gemFireProps) + ccf.setPoolReadTimeout(30000) + val servers = LocatorHelper.getAllGeodeServers(locators) + if (servers.isDefined && servers.get.size > 0) { + val sparkIp = System.getenv("SPARK_LOCAL_IP") + val hostName = if (sparkIp != null) InetAddress.getByName(sparkIp).getCanonicalHostName + else InetAddress.getLocalHost.getCanonicalHostName + val executorId = SparkEnv.get.executorId + val pickedServers = LocatorHelper.pickPreferredGeodeServers(servers.get, hostName, executorId) + logInfo(s"""Init ClientCache: severs=${pickedServers.mkString(",")}, host=$hostName executor=$executorId props=$gemFireProps""") + logDebug(s"""Init ClientCache: all-severs=${pickedServers.mkString(",")}""") + pickedServers.foreach{ case (host, port) => ccf.addPoolServer(host, port) } + } else { + logInfo(s"""Init ClientCache: locators=${locators.mkString(",")}, props=$gemFireProps""") + locators.foreach { case (host, port) => ccf.addPoolLocator(host, port) } + } + ccf + } + + /** close the clientCache */ + override def close(): Unit = + if (! clientCache.isClosed) clientCache.close() + + /** ----------------------------------------- */ + /** implementation of GeodeConnection trait */ + /** ----------------------------------------- */ + + override def getQuery(queryString: String): Query = + clientCache.asInstanceOf[RegionService].getQueryService.newQuery(queryString) + + override def validateRegion[K, V](regionPath: String): Unit = { + val md = getRegionMetadata[K, V](regionPath) + if (! md.isDefined) throw new RuntimeException(s"The region named $regionPath was not found") + } + + def getRegionMetadata[K, V](regionPath: String): Option[RegionMetadata] = { + import scala.collection.JavaConversions.setAsJavaSet + val region = getRegionProxy[K, V](regionPath) + val set0: JSet[Integer] = Set[Integer](0) + val exec = FunctionService.onRegion(region).asInstanceOf[InternalExecution].withBucketFilter(set0) + exec.setWaitOnExceptionFlag(true) + try { + val collector = exec.execute(RetrieveRegionMetadataFunction.ID) + val r = collector.getResult.asInstanceOf[JList[RegionMetadata]] + logDebug(r.get(0).toString) + Some(r.get(0)) + } catch { + case e: FunctionException => + if (e.getMessage.contains(s"The region named /$regionPath was not found")) None + else throw e + } + } + + def getRegionProxy[K, V](regionPath: String): Region[K, V] = { + val region1: Region[K, V] = clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]] + if (region1 != null) region1 + else DefaultGeodeConnection.regionLock.synchronized { + val region2 = clientCache.getRegion(regionPath).asInstanceOf[Region[K, V]] + if (region2 != null) region2 + else clientCache.createClientRegionFactory[K, V](ClientRegionShortcut.PROXY).create(regionPath) + } + } + + override def getRegionData[K, V](regionPath: String, whereClause: Option[String], split: GeodeRDDPartition): Iterator[(K, V)] = { + val region = getRegionProxy[K, V](regionPath) + val desc = s"""RDD($regionPath, "${whereClause.getOrElse("")}", ${split.index})""" + val args : Array[String] = Array[String](whereClause.getOrElse(""), desc) + val collector = new StructStreamingResultCollector(desc) + // RetrieveRegionResultCollector[(K, V)] + import scala.collection.JavaConversions.setAsJavaSet + val exec = FunctionService.onRegion(region).withArgs(args).withCollector(collector).asInstanceOf[InternalExecution] + .withBucketFilter(split.bucketSet.map(Integer.valueOf)) + exec.setWaitOnExceptionFlag(true) + exec.execute(RetrieveRegionFunction.ID) + collector.getResult.map{objs: Array[Object] => (objs(0).asInstanceOf[K], objs(1).asInstanceOf[V])} + } + + override def executeQuery(regionPath: String, bucketSet: Set[Int], queryString: String) = { + import scala.collection.JavaConversions.setAsJavaSet + FunctionService.registerFunction(QueryFunction.getInstance()) + val collector = new QueryResultCollector + val region = getRegionProxy(regionPath) + val args: Array[String] = Array[String](queryString, bucketSet.toString) + val exec = FunctionService.onRegion(region).withCollector(collector).asInstanceOf[InternalExecution] + .withBucketFilter(bucketSet.map(Integer.valueOf)) + .withArgs(args) + exec.execute(QueryFunction.ID) + collector.getResult + } +} + +private[connector] object DefaultGeodeConnection { + /** a lock object only used by getRegionProxy...() */ + private val regionLock = new Object +} + +/** The purpose of this class is making unit test DefaultGeodeConnectionManager easier */ +class DefaultGeodeConnectionFactory { + + def newConnection(locators: Seq[(String, Int)], gemFireProps: Map[String, String] = Map.empty) = + new DefaultGeodeConnection(locators, gemFireProps) + +}
