http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
new file mode 100644
index 0000000..ee4c338
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.loader
+
+import java.util.UUID
+
+import com.typesafe.config.Config
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hbase.{HConstants, KeyValue}
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.io.compress.Compression
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
+import org.apache.hadoop.hbase.mapreduce.{GraphHFileOutputFormat, 
HFileOutputFormat2}
+import org.apache.hadoop.hbase.regionserver.BloomType
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, 
SequenceFileInputFormat}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, 
SequenceFileOutputFormat}
+import org.apache.hadoop.mapreduce.{Job, Mapper}
+import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
+import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+object HFileMRGenerator extends RawFileGenerator {
+  val DefaultBlockSize = 32768
+  val DefaultConfig = Map(
+    "yarn.app.mapreduce.am.resource.mb" -> 4096,
+    "mapred.child.java.opts" -> "=-Xmx8g",
+    "mapreduce.job.running.map.limit" -> 200,
+    "mapreduce.job.running.reduce.limit" -> 200,
+    "mapreduce.reduce.memory.mb" -> 8192,
+    "mapreduce.reduce.java.opts" -> "-Xmx6144m",
+    "mapreduce.map.memory.mb" -> "4096",
+    "mapreduce.map.java.opts" -> "-Xmx3072m",
+    "mapreduce.task.io.sort.factor" -> 10
+  )
+
+  class HFileMapper extends Mapper[ImmutableBytesWritable, 
ImmutableBytesWritable, ImmutableBytesWritable, KeyValue] {
+    override def map(key: ImmutableBytesWritable, value: 
ImmutableBytesWritable,
+                     context: Mapper[ImmutableBytesWritable, 
ImmutableBytesWritable, ImmutableBytesWritable, KeyValue]#Context): Unit = {
+      val keyValue = new KeyValue(value.get(), 0, value.get().length)
+      context.write(new ImmutableBytesWritable(keyValue.getRow), keyValue)
+    }
+  }
+
+  def setMROptions(conf: Configuration) = {
+    DefaultConfig.foreach { case (k, v) =>
+      conf.set(k, conf.get(k, v.toString))
+    }
+  }
+
+  def getStartKeys(numRegions: Int): Seq[ImmutableBytesWritable] = {
+    val startKey = AsynchbaseStorageManagement.getStartKey(numRegions)
+    val endKey = AsynchbaseStorageManagement.getEndKey(numRegions)
+    if (numRegions < 3) {
+      throw new IllegalArgumentException("Must create at least three regions")
+    }
+    else if (Bytes.compareTo(startKey, endKey) >= 0) {
+      throw new IllegalArgumentException("Start key must be smaller than end 
key")
+    }
+    val empty = new Array[Byte](0)
+    val results = if (numRegions == 3) {
+      Seq(empty, startKey, endKey)
+    } else {
+      val splitKeys: Array[Array[Byte]] = Bytes.split(startKey, endKey, 
numRegions - 3)
+      if (splitKeys == null || splitKeys.length != numRegions - 1) {
+        throw new IllegalArgumentException("Unable to split key range into 
enough regions")
+      }
+      Seq(empty) ++ splitKeys.toSeq
+    }
+    results.map(new ImmutableBytesWritable(_))
+  }
+
+  def sortKeyValues(hbaseConf: Configuration,
+                    tableName: String,
+                    input: String,
+                    output: String,
+                    partitionSize: Int,
+                    compressionAlgorithm: String) = {
+    val job = Job.getInstance(hbaseConf, s"sort-MR $tableName")
+    job.setJarByClass(getClass)
+    
job.setInputFormatClass(classOf[SequenceFileInputFormat[ImmutableBytesWritable, 
ImmutableBytesWritable]])
+    job.setMapperClass(classOf[HFileMapper])
+    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
+    job.setOutputValueClass(classOf[KeyValue])
+    job.setOutputFormatClass(classOf[HFileOutputFormat2])
+    job.getConfiguration.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 
s"/tmp/${tableName}_staging")
+    FileInputFormat.addInputPaths(job, input)
+    FileOutputFormat.setOutputPath(job, new Path(output))
+
+
+    import scala.collection.JavaConversions._
+
+    GraphHFileOutputFormat.configureIncrementalLoad(job,
+      getStartKeys(partitionSize),
+      Seq("e", "v"),
+      
Compression.getCompressionAlgorithmByName(compressionAlgorithm.toLowerCase),
+      BloomType.ROW,
+      DefaultBlockSize,
+      DataBlockEncoding.FAST_DIFF
+    )
+    job
+  }
+
+  def transfer(sc: SparkContext,
+               s2Config: Config,
+               input: RDD[String],
+               graphFileOptions: GraphFileOptions): RDD[KeyValue] = {
+    HFileGenerator.transfer(sc, s2Config, input, graphFileOptions)
+  }
+
+  override def generate(sc: SparkContext,
+                        config: Config,
+                        rdd: RDD[String],
+                        options: GraphFileOptions) = {
+    val merged = transfer(sc, config, rdd, options)
+    val tmpOutput = options.tempDir
+
+    merged.map(x => (new ImmutableBytesWritable(x.getKey), new 
ImmutableBytesWritable(x.getBuffer)))
+      .saveAsNewAPIHadoopFile(tmpOutput,
+        classOf[ImmutableBytesWritable],
+        classOf[ImmutableBytesWritable],
+        classOf[SequenceFileOutputFormat[ImmutableBytesWritable, 
ImmutableBytesWritable]])
+
+    sc.killExecutors((1 to sc.defaultParallelism).map(_.toString))
+
+    val conf = sc.hadoopConfiguration
+
+    conf.setClassLoader(Thread.currentThread().getContextClassLoader())
+    setMROptions(conf)
+
+    val job = sortKeyValues(conf, options.tableName, tmpOutput, options.output,
+      options.numRegions, options.compressionAlgorithm)
+
+    val success = job.waitForCompletion(true)
+    FileSystem.get(conf).delete(new Path(tmpOutput), true)
+    if (!success) {
+      throw new RuntimeException("mapreduce failed")
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/RawFileGenerator.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/RawFileGenerator.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/RawFileGenerator.scala
new file mode 100644
index 0000000..1613f20
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/RawFileGenerator.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs.loader
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.GraphUtil
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+case class DegreeKey(vertexIdStr: String, labelName: String, direction: String)
+
+trait RawFileGenerator {
+  def generate(sc: SparkContext,
+               config:Config,
+               rdd: RDD[String],
+               _options:GraphFileOptions)
+
+  def buildDegrees(msgs: RDD[String], labelMapping: Map[String, String], 
edgeAutoCreate: Boolean) = {
+    for {
+      msg <- msgs
+      tokens = GraphUtil.split(msg)
+      if tokens(2) == "e" || tokens(2) == "edge"
+      tempDirection = if (tokens.length == 7) "out" else tokens(7)
+      direction = if (tempDirection != "out" && tempDirection != "in") "out" 
else tempDirection
+      reverseDirection = if (direction == "out") "in" else "out"
+      convertedLabelName = labelMapping.get(tokens(5)).getOrElse(tokens(5))
+      (vertexIdStr, vertexIdStrReversed) = (tokens(3), tokens(4))
+      degreeKey = DegreeKey(vertexIdStr, convertedLabelName, direction)
+      degreeKeyReversed = DegreeKey(vertexIdStrReversed, convertedLabelName, 
reverseDirection)
+      extra = if (edgeAutoCreate) List(degreeKeyReversed -> 1L) else Nil
+      output <- List(degreeKey -> 1L) ++ extra
+    } yield output
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/BulkLoadPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/BulkLoadPartitioner.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/BulkLoadPartitioner.scala
new file mode 100644
index 0000000..4ecce80
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/BulkLoadPartitioner.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s2graph.s2jobs.spark
+
+import java.util
+import java.util.Comparator
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.Partitioner
+
+/**
+ * A Partitioner implementation that will separate records to different
+ * HBase Regions based on region splits
+ *
+ * @param startKeys   The start keys for the given table
+ */
+class BulkLoadPartitioner(startKeys:Array[Array[Byte]])
+  extends Partitioner {
+
+  override def numPartitions: Int = startKeys.length
+
+  override def getPartition(key: Any): Int = {
+
+    val rowKey:Array[Byte] =
+      key match {
+        case qualifier: KeyFamilyQualifier =>
+          qualifier.rowKey
+        case _ =>
+          key.asInstanceOf[Array[Byte]]
+      }
+
+    val comparator: Comparator[Array[Byte]] = new Comparator[Array[Byte]] {
+      override def compare(o1: Array[Byte], o2: Array[Byte]): Int = {
+        Bytes.compareTo(o1, o2)
+      }
+    }
+    val partition = util.Arrays.binarySearch(startKeys, rowKey, comparator)
+    if (partition < 0) partition * -1 + -2
+    else partition
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/FamilyHFileWriteOptions.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/FamilyHFileWriteOptions.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/FamilyHFileWriteOptions.scala
new file mode 100644
index 0000000..ea69c53
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/FamilyHFileWriteOptions.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s2graph.s2jobs.spark
+
+import java.io.Serializable
+
+/**
+ * This object will hold optional data for how a given column family's
+ * writer will work
+ *
+ * @param compression       String to define the Compression to be used in the 
HFile
+ * @param bloomType         String to define the bloom type to be used in the 
HFile
+ * @param blockSize         The block size to be used in the HFile
+ * @param dataBlockEncoding String to define the data block encoding to be used
+ *                          in the HFile
+ */
+class FamilyHFileWriteOptions( val compression:String,
+                               val bloomType: String,
+                               val blockSize: Int,
+                               val dataBlockEncoding: String) extends 
Serializable

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseContext.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseContext.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseContext.scala
new file mode 100644
index 0000000..d6f6708
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseContext.scala
@@ -0,0 +1,850 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s2graph.s2jobs.spark
+
+import java.io._
+import java.net.InetSocketAddress
+import java.util
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hbase._
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.fs.HFileSystem
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.io.compress.Compression
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
+import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFileContextBuilder}
+import org.apache.hadoop.hbase.mapreduce.{IdentityTableMapper, 
TableInputFormat, TableMapReduceUtil}
+import org.apache.hadoop.hbase.regionserver.{BloomType, HStore, StoreFile}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
+import org.apache.s2graph.s2jobs.spark.HBaseRDDFunctions._
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.{SerializableWritable, SparkContext}
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+/**
+  * HBaseContext is a façade for HBase operations
+  * like bulk put, get, increment, delete, and scan
+  *
+  * HBaseContext will take the responsibilities
+  * of disseminating the configuration information
+  * to the working and managing the life cycle of HConnections.
+ */
+class HBaseContext(@transient private val sc: SparkContext,
+                   @transient private val config: Configuration,
+                   val tmpHdfsConfgFile: String = null)
+  extends Serializable {
+
+  @transient var credentials = 
UserGroupInformation.getCurrentUser().getCredentials
+  @transient var tmpHdfsConfiguration:Configuration = config
+  @transient var appliedCredentials = false
+  @transient val job = Job.getInstance(config)
+  TableMapReduceUtil.initCredentials(job)
+  val broadcastedConf = sc.broadcast(new SerializableWritable(config))
+  val credentialsConf = sc.broadcast(new 
SerializableWritable(job.getCredentials))
+
+  LatestHBaseContextCache.latest = this
+
+  if (tmpHdfsConfgFile != null && config != null) {
+    val fs = FileSystem.newInstance(config)
+    val tmpPath = new Path(tmpHdfsConfgFile)
+    if (!fs.exists(tmpPath)) {
+      val outputStream = fs.create(tmpPath)
+      config.write(outputStream)
+      outputStream.close()
+    } else {
+//      logWarning("tmpHdfsConfigDir " + tmpHdfsConfgFile + " exist!!")
+    }
+  }
+
+  /**
+   * A simple enrichment of the traditional Spark RDD foreachPartition.
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * @param rdd  Original RDD with data to iterate over
+   * @param f    Function to be given a iterator to iterate through
+   *             the RDD values and a HConnection object to interact
+   *             with HBase
+   */
+  def foreachPartition[T](rdd: RDD[T],
+                          f: (Iterator[T], Connection) => Unit):Unit = {
+    rdd.foreachPartition(
+      it => hbaseForeachPartition(broadcastedConf, it, f))
+  }
+
+  /**
+   * A simple enrichment of the traditional Spark Streaming dStream foreach
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * @param dstream  Original DStream with data to iterate over
+   * @param f        Function to be given a iterator to iterate through
+   *                 the DStream values and a HConnection object to
+   *                 interact with HBase
+   */
+  def foreachPartition[T](dstream: DStream[T],
+                    f: (Iterator[T], Connection) => Unit):Unit = {
+    dstream.foreachRDD((rdd, time) => {
+      foreachPartition(rdd, f)
+    })
+  }
+
+  /**
+   * A simple enrichment of the traditional Spark RDD mapPartition.
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * @param rdd  Original RDD with data to iterate over
+   * @param mp   Function to be given a iterator to iterate through
+   *             the RDD values and a HConnection object to interact
+   *             with HBase
+   * @return     Returns a new RDD generated by the user definition
+   *             function just like normal mapPartition
+   */
+  def mapPartitions[T, R: ClassTag](rdd: RDD[T],
+                                   mp: (Iterator[T], Connection) => 
Iterator[R]): RDD[R] = {
+
+    rdd.mapPartitions[R](it => hbaseMapPartition[T, R](broadcastedConf,
+      it,
+      mp))
+
+  }
+
+  /**
+   * A simple enrichment of the traditional Spark Streaming DStream
+   * foreachPartition.
+   *
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * Note: Make sure to partition correctly to avoid memory issue when
+   *       getting data from HBase
+   *
+   * @param dstream  Original DStream with data to iterate over
+   * @param f       Function to be given a iterator to iterate through
+   *                 the DStream values and a HConnection object to
+   *                 interact with HBase
+   * @return         Returns a new DStream generated by the user
+   *                 definition function just like normal mapPartition
+   */
+  def streamForeachPartition[T](dstream: DStream[T],
+                                f: (Iterator[T], Connection) => Unit): Unit = {
+
+    dstream.foreachRDD(rdd => this.foreachPartition(rdd, f))
+  }
+
+  /**
+   * A simple enrichment of the traditional Spark Streaming DStream
+   * mapPartition.
+   *
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * Note: Make sure to partition correctly to avoid memory issue when
+   *       getting data from HBase
+   *
+   * @param dstream  Original DStream with data to iterate over
+   * @param f       Function to be given a iterator to iterate through
+   *                 the DStream values and a HConnection object to
+   *                 interact with HBase
+   * @return         Returns a new DStream generated by the user
+   *                 definition function just like normal mapPartition
+   */
+  def streamMapPartitions[T, U: ClassTag](dstream: DStream[T],
+                                f: (Iterator[T], Connection) => Iterator[U]):
+  DStream[U] = {
+    dstream.mapPartitions(it => hbaseMapPartition[T, U](
+      broadcastedConf,
+      it,
+      f))
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.foreachPartition method.
+   *
+   * It allow addition support for a user to take RDD
+   * and generate puts and send them to HBase.
+   * The complexity of managing the HConnection is
+   * removed from the developer
+   *
+   * @param rdd       Original RDD with data to iterate over
+   * @param tableName The name of the table to put into
+   * @param f         Function to convert a value in the RDD to a HBase Put
+   */
+  def bulkPut[T](rdd: RDD[T], tableName: TableName, f: (T) => Put) {
+
+    val tName = tableName.getName
+    rdd.foreachPartition(
+      it => hbaseForeachPartition[T](
+        broadcastedConf,
+        it,
+        (iterator, connection) => {
+          val m = connection.getBufferedMutator(TableName.valueOf(tName))
+          iterator.foreach(T => m.mutate(f(T)))
+          m.flush()
+          m.close()
+        }))
+  }
+
+  def applyCreds[T] (configBroadcast: 
Broadcast[SerializableWritable[Configuration]]){
+    credentials = UserGroupInformation.getCurrentUser().getCredentials
+
+//    logDebug("appliedCredentials:" + appliedCredentials + ",credentials:" + 
credentials)
+
+    if (!appliedCredentials && credentials != null) {
+      appliedCredentials = true
+
+      @transient val ugi = UserGroupInformation.getCurrentUser
+      ugi.addCredentials(credentials)
+      // specify that this is a proxy user
+      ugi.setAuthenticationMethod(AuthenticationMethod.PROXY)
+
+      ugi.addCredentials(credentialsConf.value.value)
+    }
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.streamMapPartition method.
+   *
+   * It allow addition support for a user to take a DStream and
+   * generate puts and send them to HBase.
+   *
+   * The complexity of managing the HConnection is
+   * removed from the developer
+   *
+   * @param dstream    Original DStream with data to iterate over
+   * @param tableName  The name of the table to put into
+   * @param f          Function to convert a value in
+   *                   the DStream to a HBase Put
+   */
+  def streamBulkPut[T](dstream: DStream[T],
+                       tableName: TableName,
+                       f: (T) => Put) = {
+    val tName = tableName.getName
+    dstream.foreachRDD((rdd, time) => {
+      bulkPut(rdd, TableName.valueOf(tName), f)
+    })
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.foreachPartition method.
+   *
+   * It allow addition support for a user to take a RDD and generate delete
+   * and send them to HBase.  The complexity of managing the HConnection is
+   * removed from the developer
+   *
+   * @param rdd       Original RDD with data to iterate over
+   * @param tableName The name of the table to delete from
+   * @param f         Function to convert a value in the RDD to a
+   *                  HBase Deletes
+   * @param batchSize       The number of delete to batch before sending to 
HBase
+   */
+  def bulkDelete[T](rdd: RDD[T], tableName: TableName,
+                    f: (T) => Delete, batchSize: Integer) {
+    bulkMutation(rdd, tableName, f, batchSize)
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.streamBulkMutation method.
+   *
+   * It allow addition support for a user to take a DStream and
+   * generate Delete and send them to HBase.
+   *
+   * The complexity of managing the HConnection is
+   * removed from the developer
+   *
+   * @param dstream    Original DStream with data to iterate over
+   * @param tableName  The name of the table to delete from
+   * @param f          function to convert a value in the DStream to a
+   *                   HBase Delete
+   * @param batchSize        The number of deletes to batch before sending to 
HBase
+   */
+  def streamBulkDelete[T](dstream: DStream[T],
+                          tableName: TableName,
+                          f: (T) => Delete,
+                          batchSize: Integer) = {
+    streamBulkMutation(dstream, tableName, f, batchSize)
+  }
+
+  /**
+   *  Under lining function to support all bulk mutations
+   *
+   *  May be opened up if requested
+   */
+  private def bulkMutation[T](rdd: RDD[T], tableName: TableName,
+                              f: (T) => Mutation, batchSize: Integer) {
+
+    val tName = tableName.getName
+    rdd.foreachPartition(
+      it => hbaseForeachPartition[T](
+        broadcastedConf,
+        it,
+        (iterator, connection) => {
+          val table = connection.getTable(TableName.valueOf(tName))
+          val mutationList = new java.util.ArrayList[Mutation]
+          iterator.foreach(T => {
+            mutationList.add(f(T))
+            if (mutationList.size >= batchSize) {
+              table.batch(mutationList, null)
+              mutationList.clear()
+            }
+          })
+          if (mutationList.size() > 0) {
+            table.batch(mutationList, null)
+            mutationList.clear()
+          }
+          table.close()
+        }))
+  }
+
+  /**
+   *  Under lining function to support all bulk streaming mutations
+   *
+   *  May be opened up if requested
+   */
+  private def streamBulkMutation[T](dstream: DStream[T],
+                                    tableName: TableName,
+                                    f: (T) => Mutation,
+                                    batchSize: Integer) = {
+    val tName = tableName.getName
+    dstream.foreachRDD((rdd, time) => {
+      bulkMutation(rdd, TableName.valueOf(tName), f, batchSize)
+    })
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.mapPartition method.
+   *
+   * It allow addition support for a user to take a RDD and generates a
+   * new RDD based on Gets and the results they bring back from HBase
+   *
+   * @param rdd     Original RDD with data to iterate over
+   * @param tableName        The name of the table to get from
+   * @param makeGet    function to convert a value in the RDD to a
+   *                   HBase Get
+   * @param convertResult This will convert the HBase Result object to
+   *                   what ever the user wants to put in the resulting
+   *                   RDD
+   * return            new RDD that is created by the Get to HBase
+   */
+  def bulkGet[T, U: ClassTag](tableName: TableName,
+                    batchSize: Integer,
+                    rdd: RDD[T],
+                    makeGet: (T) => Get,
+                    convertResult: (Result) => U): RDD[U] = {
+
+    val getMapPartition = new GetMapPartition(tableName,
+      batchSize,
+      makeGet,
+      convertResult)
+
+    rdd.mapPartitions[U](it =>
+      hbaseMapPartition[T, U](
+        broadcastedConf,
+        it,
+        getMapPartition.run))
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.streamMap method.
+   *
+   * It allow addition support for a user to take a DStream and
+   * generates a new DStream based on Gets and the results
+   * they bring back from HBase
+   *
+   * @param tableName     The name of the table to get from
+   * @param batchSize     The number of Gets to be sent in a single batch
+   * @param dStream       Original DStream with data to iterate over
+   * @param makeGet       Function to convert a value in the DStream to a
+   *                      HBase Get
+   * @param convertResult This will convert the HBase Result object to
+   *                      what ever the user wants to put in the resulting
+   *                      DStream
+   * @return              A new DStream that is created by the Get to HBase
+   */
+  def streamBulkGet[T, U: ClassTag](tableName: TableName,
+                                    batchSize: Integer,
+                                    dStream: DStream[T],
+                                    makeGet: (T) => Get,
+                                    convertResult: (Result) => U): DStream[U] 
= {
+
+    val getMapPartition = new GetMapPartition(tableName,
+      batchSize,
+      makeGet,
+      convertResult)
+
+    dStream.mapPartitions[U](it => hbaseMapPartition[T, U](
+      broadcastedConf,
+      it,
+      getMapPartition.run))
+  }
+
+  /**
+   * This function will use the native HBase TableInputFormat with the
+   * given scan object to generate a new RDD
+   *
+   *  @param tableName the name of the table to scan
+   *  @param scan      the HBase scan object to use to read data from HBase
+   *  @param f         function to convert a Result object from HBase into
+   *                   what the user wants in the final generated RDD
+   *  @return          new RDD with results from scan
+   */
+  def hbaseRDD[U: ClassTag](tableName: TableName, scan: Scan,
+                            f: ((ImmutableBytesWritable, Result)) => U): 
RDD[U] = {
+
+    val job: Job = Job.getInstance(getConf(broadcastedConf))
+
+    TableMapReduceUtil.initCredentials(job)
+    TableMapReduceUtil.initTableMapperJob(tableName, scan,
+      classOf[IdentityTableMapper], null, null, job)
+
+    sc.newAPIHadoopRDD(job.getConfiguration,
+      classOf[TableInputFormat],
+      classOf[ImmutableBytesWritable],
+      classOf[Result]).map(f)
+  }
+
+  /**
+   * A overloaded version of HBaseContext hbaseRDD that defines the
+   * type of the resulting RDD
+   *
+   *  @param tableName the name of the table to scan
+   *  @param scans     the HBase scan object to use to read data from HBase
+   *  @return          New RDD with results from scan
+   *
+   */
+  def hbaseRDD(tableName: TableName, scans: Scan):
+  RDD[(ImmutableBytesWritable, Result)] = {
+
+    hbaseRDD[(ImmutableBytesWritable, Result)](
+      tableName,
+      scans,
+      (r: (ImmutableBytesWritable, Result)) => r)
+  }
+
+  /**
+   *  underlining wrapper all foreach functions in HBaseContext
+   */
+  private def hbaseForeachPartition[T](configBroadcast:
+                                       
Broadcast[SerializableWritable[Configuration]],
+                                        it: Iterator[T],
+                                        f: (Iterator[T], Connection) => Unit) 
= {
+
+    val config = getConf(configBroadcast)
+
+    applyCreds(configBroadcast)
+    // specify that this is a proxy user
+    val connection = ConnectionFactory.createConnection(config)
+    f(it, connection)
+    connection.close()
+  }
+
+  private def getConf(configBroadcast: 
Broadcast[SerializableWritable[Configuration]]):
+  Configuration = {
+
+    if (tmpHdfsConfiguration == null && tmpHdfsConfgFile != null) {
+      val fs = FileSystem.newInstance(SparkHadoopUtil.get.conf)
+      val inputStream = fs.open(new Path(tmpHdfsConfgFile))
+      tmpHdfsConfiguration = new Configuration(false)
+      tmpHdfsConfiguration.readFields(inputStream)
+      inputStream.close()
+    }
+
+    if (tmpHdfsConfiguration == null) {
+      try {
+        tmpHdfsConfiguration = configBroadcast.value.value
+      } catch {
+        case ex: Exception =>
+//          logError("Unable to getConfig from broadcast", ex)
+      }
+    }
+    tmpHdfsConfiguration
+  }
+
+  /**
+   *  underlining wrapper all mapPartition functions in HBaseContext
+   *
+   */
+  private def hbaseMapPartition[K, U](
+                                       configBroadcast:
+                                       
Broadcast[SerializableWritable[Configuration]],
+                                       it: Iterator[K],
+                                       mp: (Iterator[K], Connection) =>
+                                         Iterator[U]): Iterator[U] = {
+
+    val config = getConf(configBroadcast)
+    applyCreds(configBroadcast)
+
+    val connection = ConnectionFactory.createConnection(config)
+    val res = mp(it, connection)
+    connection.close()
+    res
+
+  }
+
+  /**
+   *  underlining wrapper all get mapPartition functions in HBaseContext
+   */
+  private class GetMapPartition[T, U](tableName: TableName,
+                                      batchSize: Integer,
+                                      makeGet: (T) => Get,
+                                      convertResult: (Result) => U)
+    extends Serializable {
+
+    val tName = tableName.getName
+
+    def run(iterator: Iterator[T], connection: Connection): Iterator[U] = {
+      val table = connection.getTable(TableName.valueOf(tName))
+
+      val gets = new java.util.ArrayList[Get]()
+      var res = List[U]()
+
+      while (iterator.hasNext) {
+        gets.add(makeGet(iterator.next()))
+
+        if (gets.size() == batchSize) {
+          val results = table.get(gets)
+          res = res ++ results.map(convertResult)
+          gets.clear()
+        }
+      }
+      if (gets.size() > 0) {
+        val results = table.get(gets)
+        res = res ++ results.map(convertResult)
+        gets.clear()
+      }
+      table.close()
+      res.iterator
+    }
+  }
+
+  /**
+   * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
+   *
+   * This method is used to keep ClassTags out of the external Java API, as
+   * the Java compiler cannot produce them automatically. While this
+   * ClassTag-faking does please the compiler, it can cause problems at runtime
+   * if the Scala API relies on ClassTags for correctness.
+   *
+   * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
+   * just worse performance or security issues.
+   * For instance, an Array of AnyRef can hold any type T, but may lose 
primitive
+   * specialization.
+   */
+  private[spark]
+  def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+
+  /**
+   * A Spark Implementation of HBase Bulk load
+   *
+   * This will take the content from an existing RDD then sort and shuffle
+   * it with respect to region splits.  The result of that sort and shuffle
+   * will be written to HFiles.
+   *
+   * After this function is executed the user will have to call
+   * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
+   *
+   * Also note this version of bulk load is different from past versions in
+   * that it includes the qualifier as part of the sort process. The
+   * reason for this is to be able to support rows will very large number
+   * of columns.
+   *
+   * @param rdd                            The RDD we are bulk loading from
+   * @param tableName                      The HBase table we are loading into
+   * @param startKeys
+   * @param flatMap                        A flapMap function that will make 
every
+   *                                       row in the RDD
+   *                                       into N cells for the bulk load
+   * @param stagingDir                     The location on the FileSystem to 
bulk load into
+   * @param familyHFileWriteOptionsMap     Options that will define how the 
HFile for a
+   *                                       column family is written
+   * @param compactionExclude              Compaction excluded for the HFiles
+   * @param maxSize                        Max size for the HFiles before they 
roll
+   * @tparam T                             The Type of values in the original 
RDD
+   */
+  def bulkLoad[T](rdd:RDD[T],
+                  tableName: TableName,
+                  startKeys: Array[Array[Byte]],
+                  flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
+                  stagingDir:String,
+                  familyHFileWriteOptionsMap: util.Map[Array[Byte], 
FamilyHFileWriteOptions] = new util.HashMap[Array[Byte], 
FamilyHFileWriteOptions],
+                  compactionExclude: Boolean = false,
+                  maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
+  Unit = {
+//    val conn = ConnectionFactory.createConnection(config)
+//    val regionLocator = conn.getRegionLocator(tableName)
+//    val startKeys = regionLocator.getStartKeys
+    val defaultCompressionStr = config.get("hfile.compression",
+      Compression.Algorithm.NONE.getName)
+    val defaultCompression = 
Compression.getCompressionAlgorithmByName(defaultCompressionStr)
+//      HFileWriterImpl
+//      .compressionByName(defaultCompressionStr)
+    val now = System.currentTimeMillis()
+    val tableNameByteArray = tableName.getName
+
+    val familyHFileWriteOptionsMapInternal =
+      new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
+
+    val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
+
+    while (entrySetIt.hasNext) {
+      val entry = entrySetIt.next()
+      familyHFileWriteOptionsMapInternal.put(new 
ByteArrayWrapper(entry.getKey), entry.getValue)
+    }
+
+    /*
+     *  This will return a new HFile writer when requested
+     *
+     * @param family       column family
+     * @param conf         configuration to connect to HBase
+     * @param favoredNodes nodes that we would like to write too
+     * @param fs           FileSystem object where we will be writing the 
HFiles to
+     * @return WriterLength object
+     */
+    def getNewWriter(family: Array[Byte], conf: Configuration,
+                     favoredNodes: Array[InetSocketAddress],
+                     fs:FileSystem,
+                     familydir:Path): WriterLength = {
+
+
+      var familyOptions = familyHFileWriteOptionsMapInternal.get(new 
ByteArrayWrapper(family))
+
+      if (familyOptions == null) {
+        familyOptions = new 
FamilyHFileWriteOptions(defaultCompression.toString,
+          BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, 
DataBlockEncoding.NONE.toString)
+        familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), 
familyOptions)
+      }
+
+      val tempConf = new Configuration(conf)
+      tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
+      val contextBuilder = new HFileContextBuilder()
+        .withCompression(Algorithm.valueOf(familyOptions.compression))
+        .withChecksumType(HStore.getChecksumType(conf))
+        .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
+        .withBlockSize(familyOptions.blockSize)
+      contextBuilder.withDataBlockEncoding(DataBlockEncoding.
+        valueOf(familyOptions.dataBlockEncoding))
+      val hFileContext = contextBuilder.build()
+
+      if (null == favoredNodes) {
+        new WriterLength(0, new StoreFile.WriterBuilder(conf, new 
CacheConfig(tempConf), fs)
+          
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
+          
.withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext).build())
+//          
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build())
+      } else {
+        new WriterLength(0,
+          new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new 
HFileSystem(fs))
+          
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
+            .withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext)
+//          
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
+          .withFavoredNodes(favoredNodes).build())
+      }
+    }
+
+    val regionSplitPartitioner =
+      new BulkLoadPartitioner(startKeys)
+
+    //This is where all the magic happens
+    //Here we are going to do the following things
+    // 1. FlapMap every row in the RDD into key column value tuples
+    // 2. Then we are going to repartition sort and shuffle
+    // 3. Finally we are going to write out our HFiles
+    rdd.flatMap( r => flatMap(r)).
+      repartitionAndSortWithinPartitions(regionSplitPartitioner).
+      hbaseForeachPartition(this, (it, conn) => {
+
+      val conf = broadcastedConf.value.value
+      val fs = FileSystem.get(conf)
+      val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
+      var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
+      var rollOverRequested = false
+
+      /*
+       * This will roll all writers
+       */
+      def rollWriters(): Unit = {
+        writerMap.values.foreach( wl => {
+          if (wl.writer != null) {
+//            logDebug("Writer=" + wl.writer.getPath +
+//              (if (wl.written == 0) "" else ", wrote=" + wl.written))
+            close(wl.writer)
+          }
+        })
+        writerMap.clear()
+        rollOverRequested = false
+      }
+
+      /*
+       * This function will close a given HFile writer
+       * @param w The writer to close
+       */
+      def close(w:StoreFile.Writer): Unit = {
+        if (w != null) {
+          w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
+            Bytes.toBytes(System.currentTimeMillis()))
+          w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
+            Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow)))
+          w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
+            Bytes.toBytes(true))
+          w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
+            Bytes.toBytes(compactionExclude))
+          w.appendTrackedTimestampsToMetadata()
+          w.close()
+        }
+      }
+
+      //Here is where we finally iterate through the data in this partition of 
the
+      //RDD that has been sorted and partitioned
+      it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) =>
+
+        //This will get a writer for the column family
+        //If there is no writer for a given column family then
+        //it will get created here.
+        val wl = writerMap.getOrElseUpdate(new 
ByteArrayWrapper(keyFamilyQualifier.family), {
+
+          val familyDir = new Path(stagingDir, 
Bytes.toString(keyFamilyQualifier.family))
+
+          fs.mkdirs(familyDir)
+
+          val loc:HRegionLocation = {
+            try {
+              val locator =
+                conn.getRegionLocator(TableName.valueOf(tableNameByteArray))
+              locator.getRegionLocation(keyFamilyQualifier.rowKey)
+            } catch {
+              case e: Throwable =>
+//              logWarning("there's something wrong when locating rowkey: " +
+//                Bytes.toString(keyFamilyQualifier.rowKey))
+                null
+            }
+          }
+          if (null == loc) {
+//            if (log.isTraceEnabled) {
+//              logTrace("failed to get region location, so use default 
writer: " +
+//                Bytes.toString(keyFamilyQualifier.rowKey))
+//            }
+            getNewWriter(family = keyFamilyQualifier.family, conf = conf, 
favoredNodes = null,
+              fs = fs, familydir = familyDir)
+          } else {
+//            if (log.isDebugEnabled) {
+//              logDebug("first rowkey: [" + 
Bytes.toString(keyFamilyQualifier.rowKey) + "]")
+//            }
+            val initialIsa =
+              new InetSocketAddress(loc.getHostname, loc.getPort)
+            if (initialIsa.isUnresolved) {
+//              if (log.isTraceEnabled) {
+//                logTrace("failed to resolve bind address: " + 
loc.getHostname + ":"
+//                  + loc.getPort + ", so use default writer")
+//              }
+              getNewWriter(keyFamilyQualifier.family, conf, null, fs, 
familyDir)
+            } else {
+//              if(log.isDebugEnabled) {
+//                logDebug("use favored nodes writer: " + 
initialIsa.getHostString)
+//              }
+              getNewWriter(keyFamilyQualifier.family, conf,
+                Array[InetSocketAddress](initialIsa), fs, familyDir)
+            }
+          }
+        })
+
+        val keyValue =new KeyValue(keyFamilyQualifier.rowKey,
+          keyFamilyQualifier.family,
+          keyFamilyQualifier.qualifier,
+          now,cellValue)
+
+        wl.writer.append(keyValue)
+        wl.written += keyValue.getLength
+
+        rollOverRequested = rollOverRequested || wl.written > maxSize
+
+        //This will only roll if we have at least one column family file that 
is
+        //bigger then maxSize and we have finished a given row key
+        if (rollOverRequested && Bytes.compareTo(previousRow, 
keyFamilyQualifier.rowKey) != 0) {
+          rollWriters()
+        }
+
+        previousRow = keyFamilyQualifier.rowKey
+      }
+      //We have finished all the data so lets close up the writers
+      rollWriters()
+    })
+  }
+
+  /**
+   * This is a wrapper class around StoreFile.Writer.  The reason for the
+   * wrapper is to keep the length of the file along side the writer
+   *
+   * @param written The writer to be wrapped
+   * @param writer  The number of bytes written to the writer
+   */
+  class WriterLength(var written:Long, val writer:StoreFile.Writer)
+
+  /**
+   * This is a wrapper over a byte array so it can work as
+   * a key in a hashMap
+   *
+   * @param o1 The Byte Array value
+   */
+  class ByteArrayWrapper (val o1:Array[Byte])
+    extends Comparable[ByteArrayWrapper] with Serializable {
+    override def compareTo(o2: ByteArrayWrapper): Int = {
+      Bytes.compareTo(o1,o2.o1)
+    }
+    override def equals(o2: Any): Boolean = {
+      o2 match {
+        case wrapper: ByteArrayWrapper =>
+          Bytes.equals(o1, wrapper.o1)
+        case _ =>
+          false
+      }
+    }
+    override def hashCode():Int = {
+      Bytes.hashCode(o1)
+    }
+  }
+}
+
+object LatestHBaseContextCache {
+  var latest:HBaseContext = null
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseDStreamFunctions.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseDStreamFunctions.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseDStreamFunctions.scala
new file mode 100644
index 0000000..9faaecd
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseDStreamFunctions.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s2graph.s2jobs.spark
+
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.spark.streaming.dstream.DStream
+
+import scala.reflect.ClassTag
+
+/**
+ * HBaseDStreamFunctions contains a set of implicit functions that can be
+ * applied to a Spark DStream so that we can easily interact with HBase
+ */
+object HBaseDStreamFunctions {
+
+  /**
+   * These are implicit methods for a DStream that contains any type of
+   * data.
+   *
+   * @param dStream  This is for dStreams of any type
+   * @tparam T       Type T
+   */
+  implicit class GenericHBaseDStreamFunctions[T](val dStream: DStream[T]) {
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * put.  This will not return a new Stream.  Think of it like a foreach
+     *
+     * @param hc         The hbaseContext object to identify which
+     *                   HBase cluster connection to use
+     * @param tableName  The tableName that the put will be sent to
+     * @param f          The function that will turn the DStream values
+     *                   into HBase Put objects.
+     */
+    def hbaseBulkPut(hc: HBaseContext,
+                     tableName: TableName,
+                     f: (T) => Put): Unit = {
+      hc.streamBulkPut(dStream, tableName, f)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * get.  This will return a new DStream.  Think about it as a DStream map
+     * function.  In that every DStream value will get a new value out of
+     * HBase.  That new value will populate the newly generated DStream.
+     *
+     * @param hc             The hbaseContext object to identify which
+     *                       HBase cluster connection to use
+     * @param tableName      The tableName that the put will be sent to
+     * @param batchSize      How many gets to execute in a single batch
+     * @param f              The function that will turn the RDD values
+     *                       in HBase Get objects
+     * @param convertResult  The function that will convert a HBase
+     *                       Result object into a value that will go
+     *                       into the resulting DStream
+     * @tparam R             The type of Object that will be coming
+     *                       out of the resulting DStream
+     * @return               A resulting DStream with type R objects
+     */
+    def hbaseBulkGet[R: ClassTag](hc: HBaseContext,
+                     tableName: TableName,
+                     batchSize:Int, f: (T) => Get, convertResult: (Result) => 
R):
+    DStream[R] = {
+      hc.streamBulkGet[T, R](tableName, batchSize, dStream, f, convertResult)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * get.  This will return a new DStream.  Think about it as a DStream map
+     * function.  In that every DStream value will get a new value out of
+     * HBase.  That new value will populate the newly generated DStream.
+     *
+     * @param hc             The hbaseContext object to identify which
+     *                       HBase cluster connection to use
+     * @param tableName      The tableName that the put will be sent to
+     * @param batchSize      How many gets to execute in a single batch
+     * @param f              The function that will turn the RDD values
+     *                       in HBase Get objects
+     * @return               A resulting DStream with type R objects
+     */
+    def hbaseBulkGet(hc: HBaseContext,
+                     tableName: TableName, batchSize:Int,
+                     f: (T) => Get): DStream[(ImmutableBytesWritable, Result)] 
= {
+        hc.streamBulkGet[T, (ImmutableBytesWritable, Result)](
+          tableName, batchSize, dStream, f,
+          result => (new ImmutableBytesWritable(result.getRow), result))
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * Delete.  This will not return a new DStream.
+     *
+     * @param hc         The hbaseContext object to identify which HBase
+     *                   cluster connection to use
+     * @param tableName  The tableName that the deletes will be sent to
+     * @param f          The function that will convert the DStream value into
+     *                   a HBase Delete Object
+     * @param batchSize  The number of Deletes to be sent in a single batch
+     */
+    def hbaseBulkDelete(hc: HBaseContext,
+                        tableName: TableName,
+                        f:(T) => Delete, batchSize:Int): Unit = {
+      hc.streamBulkDelete(dStream, tableName, f, batchSize)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's
+     * foreachPartition method.  This will ack very much like a normal DStream
+     * foreach method but for the fact that you will now have a HBase 
connection
+     * while iterating through the values.
+     *
+     * @param hc  The hbaseContext object to identify which HBase
+     *            cluster connection to use
+     * @param f   This function will get an iterator for a Partition of an
+     *            DStream along with a connection object to HBase
+     */
+    def hbaseForeachPartition(hc: HBaseContext,
+                              f: (Iterator[T], Connection) => Unit): Unit = {
+      hc.streamForeachPartition(dStream, f)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's
+     * mapPartitions method.  This will ask very much like a normal DStream
+     * map partitions method but for the fact that you will now have a
+     * HBase connection while iterating through the values
+     *
+     * @param hc  The hbaseContext object to identify which HBase
+     *            cluster connection to use
+     * @param f   This function will get an iterator for a Partition of an
+     *            DStream along with a connection object to HBase
+     * @tparam R  This is the type of objects that will go into the resulting
+     *            DStream
+     * @return    A resulting DStream of type R
+     */
+    def hbaseMapPartitions[R: ClassTag](hc: HBaseContext,
+                                        f: (Iterator[T], Connection) => 
Iterator[R]):
+    DStream[R] = {
+      hc.streamMapPartitions(dStream, f)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseRDDFunctions.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseRDDFunctions.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseRDDFunctions.scala
new file mode 100644
index 0000000..328587c
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseRDDFunctions.scala
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s2graph.s2jobs.spark
+
+import java.util
+
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.{HConstants, TableName}
+import org.apache.spark.rdd.RDD
+
+import scala.reflect.ClassTag
+
+/**
+ * HBaseRDDFunctions contains a set of implicit functions that can be
+ * applied to a Spark RDD so that we can easily interact with HBase
+ */
+object HBaseRDDFunctions
+{
+
+  /**
+   * These are implicit methods for a RDD that contains any type of
+   * data.
+   *
+   * @param rdd This is for rdd of any type
+   * @tparam T  This is any type
+   */
+  implicit class GenericHBaseRDDFunctions[T](val rdd: RDD[T]) {
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * put.  This will not return a new RDD.  Think of it like a foreach
+     *
+     * @param hc         The hbaseContext object to identify which
+     *                   HBase cluster connection to use
+     * @param tableName  The tableName that the put will be sent to
+     * @param f          The function that will turn the RDD values
+     *                   into HBase Put objects.
+     */
+    def hbaseBulkPut(hc: HBaseContext,
+                     tableName: TableName,
+                     f: (T) => Put): Unit = {
+      hc.bulkPut(rdd, tableName, f)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * get.  This will return a new RDD.  Think about it as a RDD map
+     * function.  In that every RDD value will get a new value out of
+     * HBase.  That new value will populate the newly generated RDD.
+     *
+     * @param hc             The hbaseContext object to identify which
+     *                       HBase cluster connection to use
+     * @param tableName      The tableName that the put will be sent to
+     * @param batchSize      How many gets to execute in a single batch
+     * @param f              The function that will turn the RDD values
+     *                       in HBase Get objects
+     * @param convertResult  The function that will convert a HBase
+     *                       Result object into a value that will go
+     *                       into the resulting RDD
+     * @tparam R             The type of Object that will be coming
+     *                       out of the resulting RDD
+     * @return               A resulting RDD with type R objects
+     */
+    def hbaseBulkGet[R: ClassTag](hc: HBaseContext,
+                            tableName: TableName, batchSize:Int,
+                            f: (T) => Get, convertResult: (Result) => R): 
RDD[R] = {
+      hc.bulkGet[T, R](tableName, batchSize, rdd, f, convertResult)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * get.  This will return a new RDD.  Think about it as a RDD map
+     * function.  In that every RDD value will get a new value out of
+     * HBase.  That new value will populate the newly generated RDD.
+     *
+     * @param hc             The hbaseContext object to identify which
+     *                       HBase cluster connection to use
+     * @param tableName      The tableName that the put will be sent to
+     * @param batchSize      How many gets to execute in a single batch
+     * @param f              The function that will turn the RDD values
+     *                       in HBase Get objects
+     * @return               A resulting RDD with type R objects
+     */
+    def hbaseBulkGet(hc: HBaseContext,
+                                  tableName: TableName, batchSize:Int,
+                                  f: (T) => Get): RDD[(ImmutableBytesWritable, 
Result)] = {
+      hc.bulkGet[T, (ImmutableBytesWritable, Result)](tableName,
+        batchSize, rdd, f,
+        result => if (result != null && result.getRow != null) {
+          (new ImmutableBytesWritable(result.getRow), result)
+        } else {
+          null
+        })
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * Delete.  This will not return a new RDD.
+     *
+     * @param hc         The hbaseContext object to identify which HBase
+     *                   cluster connection to use
+     * @param tableName  The tableName that the deletes will be sent to
+     * @param f          The function that will convert the RDD value into
+     *                   a HBase Delete Object
+     * @param batchSize  The number of Deletes to be sent in a single batch
+     */
+    def hbaseBulkDelete(hc: HBaseContext,
+                        tableName: TableName, f:(T) => Delete, batchSize:Int): 
Unit = {
+      hc.bulkDelete(rdd, tableName, f, batchSize)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's
+     * foreachPartition method.  This will ack very much like a normal RDD
+     * foreach method but for the fact that you will now have a HBase 
connection
+     * while iterating through the values.
+     *
+     * @param hc  The hbaseContext object to identify which HBase
+     *            cluster connection to use
+     * @param f   This function will get an iterator for a Partition of an
+     *            RDD along with a connection object to HBase
+     */
+    def hbaseForeachPartition(hc: HBaseContext,
+                              f: (Iterator[T], Connection) => Unit): Unit = {
+      hc.foreachPartition(rdd, f)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's
+     * mapPartitions method.  This will ask very much like a normal RDD
+     * map partitions method but for the fact that you will now have a
+     * HBase connection while iterating through the values
+     *
+     * @param hc  The hbaseContext object to identify which HBase
+     *            cluster connection to use
+     * @param f   This function will get an iterator for a Partition of an
+     *            RDD along with a connection object to HBase
+     * @tparam R  This is the type of objects that will go into the resulting
+     *            RDD
+     * @return    A resulting RDD of type R
+     */
+    def hbaseMapPartitions[R: ClassTag](hc: HBaseContext,
+                                        f: (Iterator[T], Connection) => 
Iterator[R]):
+    RDD[R] = {
+      hc.mapPartitions[T,R](rdd, f)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's
+     * bulkLoad method.
+     *
+     * A Spark Implementation of HBase Bulk load
+     *
+     * This will take the content from an existing RDD then sort and shuffle
+     * it with respect to region splits.  The result of that sort and shuffle
+     * will be written to HFiles.
+     *
+     * After this function is executed the user will have to call
+     * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
+     *
+     * Also note this version of bulk load is different from past versions in
+     * that it includes the qualifier as part of the sort process. The
+     * reason for this is to be able to support rows will very large number
+     * of columns.
+     *
+     * @param tableName                      The HBase table we are loading 
into
+     * @param flatMap                        A flapMap function that will make 
every row in the RDD
+     *                                       into N cells for the bulk load
+     * @param stagingDir                     The location on the FileSystem to 
bulk load into
+     * @param familyHFileWriteOptionsMap     Options that will define how the 
HFile for a
+     *                                       column family is written
+     * @param compactionExclude              Compaction excluded for the HFiles
+     * @param maxSize                        Max size for the HFiles before 
they roll
+     */
+    def hbaseBulkLoad(hc: HBaseContext,
+                         tableName: TableName,
+                         startKeys: Array[Array[Byte]],
+                         flatMap: (T) => Iterator[(KeyFamilyQualifier, 
Array[Byte])],
+                         stagingDir:String,
+                         familyHFileWriteOptionsMap:
+                         util.Map[Array[Byte], FamilyHFileWriteOptions] =
+                         new util.HashMap[Array[Byte], 
FamilyHFileWriteOptions](),
+                         compactionExclude: Boolean = false,
+                         maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):Unit 
= {
+      hc.bulkLoad(rdd, tableName, startKeys,
+        flatMap, stagingDir, familyHFileWriteOptionsMap,
+        compactionExclude, maxSize)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/JavaHBaseContext.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/JavaHBaseContext.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/JavaHBaseContext.scala
new file mode 100644
index 0000000..d2dc7d3
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/JavaHBaseContext.scala
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s2graph.s2jobs.spark
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.spark.api.java.function.{FlatMapFunction, Function, 
VoidFunction}
+import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
+import org.apache.spark.streaming.api.java.JavaDStream
+
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+
+/**
+ * This is the Java Wrapper over HBaseContext which is written in
+ * Scala.  This class will be used by developers that want to
+ * work with Spark or Spark Streaming in Java
+ *
+ * @param jsc    This is the JavaSparkContext that we will wrap
+ * @param config This is the config information to out HBase cluster
+ */
+class JavaHBaseContext(@transient private val jsc: JavaSparkContext,
+                       @transient private val config: Configuration) extends 
Serializable {
+  val hbaseContext = new HBaseContext(jsc.sc, config)
+
+  /**
+   * A simple enrichment of the traditional Spark javaRdd foreachPartition.
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * @param javaRdd Original javaRdd with data to iterate over
+   * @param f       Function to be given a iterator to iterate through
+   *                the RDD values and a HConnection object to interact
+   *                with HBase
+   */
+  def foreachPartition[T](javaRdd: JavaRDD[T],
+                          f: VoidFunction[(java.util.Iterator[T], 
Connection)]) = {
+
+    hbaseContext.foreachPartition(javaRdd.rdd,
+      (it: Iterator[T], conn: Connection) => {
+        f.call((it, conn))
+      })
+  }
+
+  /**
+   * A simple enrichment of the traditional Spark Streaming dStream foreach
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * @param javaDstream Original DStream with data to iterate over
+   * @param f           Function to be given a iterator to iterate through
+   *                    the JavaDStream values and a HConnection object to
+   *                    interact with HBase
+   */
+  def foreachPartition[T](javaDstream: JavaDStream[T],
+                          f: VoidFunction[(Iterator[T], Connection)]) = {
+    hbaseContext.foreachPartition(javaDstream.dstream,
+      (it: Iterator[T], conn: Connection) => f.call((it, conn)))
+  }
+
+  /**
+   * A simple enrichment of the traditional Spark JavaRDD mapPartition.
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * Note: Make sure to partition correctly to avoid memory issue when
+   * getting data from HBase
+   *
+   * @param javaRdd Original JavaRdd with data to iterate over
+   * @param f       Function to be given a iterator to iterate through
+   *                the RDD values and a HConnection object to interact
+   *                with HBase
+   * @return        Returns a new RDD generated by the user definition
+   *                function just like normal mapPartition
+   */
+  def mapPartitions[T, R](javaRdd: JavaRDD[T],
+                          f: FlatMapFunction[(java.util.Iterator[T],
+                            Connection), R]): JavaRDD[R] = {
+
+    def fn = (it: Iterator[T], conn: Connection) =>
+      asScalaIterator(
+        f.call((asJavaIterator(it), conn))
+      )
+
+    JavaRDD.fromRDD(hbaseContext.mapPartitions(javaRdd.rdd,
+      (iterator: Iterator[T], connection: Connection) =>
+        fn(iterator, connection))(fakeClassTag[R]))(fakeClassTag[R])
+  }
+
+  /**
+   * A simple enrichment of the traditional Spark Streaming JavaDStream
+   * mapPartition.
+   *
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * Note: Make sure to partition correctly to avoid memory issue when
+   * getting data from HBase
+   *
+   * @param javaDstream Original JavaDStream with data to iterate over
+   * @param mp          Function to be given a iterator to iterate through
+   *                    the JavaDStream values and a HConnection object to
+   *                    interact with HBase
+   * @return            Returns a new JavaDStream generated by the user
+   *                    definition function just like normal mapPartition
+   */
+  def streamMap[T, U](javaDstream: JavaDStream[T],
+                      mp: Function[(Iterator[T], Connection), Iterator[U]]):
+  JavaDStream[U] = {
+    
JavaDStream.fromDStream(hbaseContext.streamMapPartitions(javaDstream.dstream,
+      (it: Iterator[T], conn: Connection) =>
+        mp.call((it, conn)))(fakeClassTag[U]))(fakeClassTag[U])
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.foreachPartition method.
+   *
+   * It allow addition support for a user to take JavaRDD
+   * and generate puts and send them to HBase.
+   * The complexity of managing the HConnection is
+   * removed from the developer
+   *
+   * @param javaRdd   Original JavaRDD with data to iterate over
+   * @param tableName The name of the table to put into
+   * @param f         Function to convert a value in the JavaRDD
+   *                  to a HBase Put
+   */
+  def bulkPut[T](javaRdd: JavaRDD[T],
+                 tableName: TableName,
+                 f: Function[(T), Put]) {
+
+    hbaseContext.bulkPut(javaRdd.rdd, tableName, (t: T) => f.call(t))
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.streamMapPartition method.
+   *
+   * It allow addition support for a user to take a JavaDStream and
+   * generate puts and send them to HBase.
+   *
+   * The complexity of managing the HConnection is
+   * removed from the developer
+   *
+   * @param javaDstream Original DStream with data to iterate over
+   * @param tableName   The name of the table to put into
+   * @param f           Function to convert a value in
+   *                    the JavaDStream to a HBase Put
+   */
+  def streamBulkPut[T](javaDstream: JavaDStream[T],
+                       tableName: TableName,
+                       f: Function[T, Put]) = {
+    hbaseContext.streamBulkPut(javaDstream.dstream,
+      tableName,
+      (t: T) => f.call(t))
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.foreachPartition method.
+   *
+   * It allow addition support for a user to take a JavaRDD and
+   * generate delete and send them to HBase.
+   *
+   * The complexity of managing the HConnection is
+   * removed from the developer
+   *
+   * @param javaRdd   Original JavaRDD with data to iterate over
+   * @param tableName The name of the table to delete from
+   * @param f         Function to convert a value in the JavaRDD to a
+   *                  HBase Deletes
+   * @param batchSize The number of deletes to batch before sending to HBase
+   */
+  def bulkDelete[T](javaRdd: JavaRDD[T], tableName: TableName,
+                    f: Function[T, Delete], batchSize: Integer) {
+    hbaseContext.bulkDelete(javaRdd.rdd, tableName, (t: T) => f.call(t), 
batchSize)
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.streamBulkMutation method.
+   *
+   * It allow addition support for a user to take a JavaDStream and
+   * generate Delete and send them to HBase.
+   *
+   * The complexity of managing the HConnection is
+   * removed from the developer
+   *
+   * @param javaDStream Original DStream with data to iterate over
+   * @param tableName   The name of the table to delete from
+   * @param f           Function to convert a value in the JavaDStream to a
+   *                    HBase Delete
+   * @param batchSize   The number of deletes to be sent at once
+   */
+  def streamBulkDelete[T](javaDStream: JavaDStream[T],
+                          tableName: TableName,
+                          f: Function[T, Delete],
+                          batchSize: Integer) = {
+    hbaseContext.streamBulkDelete(javaDStream.dstream, tableName,
+      (t: T) => f.call(t),
+      batchSize)
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.mapPartition method.
+   *
+   * It allow addition support for a user to take a JavaRDD and generates a
+   * new RDD based on Gets and the results they bring back from HBase
+   *
+   * @param tableName     The name of the table to get from
+   * @param batchSize     batch size of how many gets to retrieve in a single 
fetch
+   * @param javaRdd       Original JavaRDD with data to iterate over
+   * @param makeGet       Function to convert a value in the JavaRDD to a
+   *                      HBase Get
+   * @param convertResult This will convert the HBase Result object to
+   *                      what ever the user wants to put in the resulting
+   *                      JavaRDD
+   * @return              New JavaRDD that is created by the Get to HBase
+   */
+  def bulkGet[T, U](tableName: TableName,
+                    batchSize: Integer,
+                    javaRdd: JavaRDD[T],
+                    makeGet: Function[T, Get],
+                    convertResult: Function[Result, U]): JavaRDD[U] = {
+
+    JavaRDD.fromRDD(hbaseContext.bulkGet[T, U](tableName,
+      batchSize,
+      javaRdd.rdd,
+      (t: T) => makeGet.call(t),
+      (r: Result) => {
+        convertResult.call(r)
+      })(fakeClassTag[U]))(fakeClassTag[U])
+
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.streamMap method.
+   *
+   * It allow addition support for a user to take a DStream and
+   * generates a new DStream based on Gets and the results
+   * they bring back from HBase
+   *
+
+   * @param tableName     The name of the table to get from
+   * @param batchSize     The number of gets to be batched together
+   * @param javaDStream   Original DStream with data to iterate over
+   * @param makeGet       Function to convert a value in the JavaDStream to a
+   *                      HBase Get
+   * @param convertResult This will convert the HBase Result object to
+   *                      what ever the user wants to put in the resulting
+   *                      JavaDStream
+   * @return              New JavaDStream that is created by the Get to HBase
+   */
+  def streamBulkGet[T, U](tableName: TableName,
+                          batchSize: Integer,
+                          javaDStream: JavaDStream[T],
+                          makeGet: Function[T, Get],
+                          convertResult: Function[Result, U]) {
+    JavaDStream.fromDStream(hbaseContext.streamBulkGet(tableName,
+      batchSize,
+      javaDStream.dstream,
+      (t: T) => makeGet.call(t),
+      (r: Result) => convertResult.call(r))(fakeClassTag[U]))(fakeClassTag[U])
+  }
+
+  /**
+   * This function will use the native HBase TableInputFormat with the
+   * given scan object to generate a new JavaRDD
+   *
+   * @param tableName The name of the table to scan
+   * @param scans     The HBase scan object to use to read data from HBase
+   * @param f         Function to convert a Result object from HBase into
+   *                  What the user wants in the final generated JavaRDD
+   * @return          New JavaRDD with results from scan
+   */
+  def hbaseRDD[U](tableName: TableName,
+                  scans: Scan,
+                  f: Function[(ImmutableBytesWritable, Result), U]):
+  JavaRDD[U] = {
+    JavaRDD.fromRDD(
+      hbaseContext.hbaseRDD[U](tableName,
+        scans,
+        (v: (ImmutableBytesWritable, Result)) =>
+          f.call((v._1, v._2)))(fakeClassTag[U]))(fakeClassTag[U])
+  }
+
+  /**
+   * A overloaded version of HBaseContext hbaseRDD that define the
+   * type of the resulting JavaRDD
+   *
+   * @param tableName The name of the table to scan
+   * @param scans     The HBase scan object to use to read data from HBase
+   * @return          New JavaRDD with results from scan
+   */
+  def hbaseRDD(tableName: TableName,
+               scans: Scan):
+  JavaRDD[(ImmutableBytesWritable, Result)] = {
+    JavaRDD.fromRDD(hbaseContext.hbaseRDD(tableName, scans))
+  }
+
+  /**
+   * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
+   *
+   * This method is used to keep ClassTags out of the external Java API, as 
the Java compiler
+   * cannot produce them automatically. While this ClassTag-faking does please 
the compiler,
+   * it can cause problems at runtime if the Scala API relies on ClassTags for 
correctness.
+   *
+   * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
+   * just worse performance or security issues.
+   * For instance, an Array[AnyRef] can hold any type T,
+   * but may lose primitive
+   * specialization.
+   */
+  private[spark]
+  def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/KeyFamilyQualifier.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/KeyFamilyQualifier.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/KeyFamilyQualifier.scala
new file mode 100644
index 0000000..1e05340
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/KeyFamilyQualifier.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s2graph.s2jobs.spark
+
+import java.io.Serializable
+
+import org.apache.hadoop.hbase.util.Bytes
+
+/**
+ * This is the key to be used for sorting and shuffling.
+ *
+ * We will only partition on the rowKey but we will sort on all three
+ *
+ * @param rowKey    Record RowKey
+ * @param family    Record ColumnFamily
+ * @param qualifier Cell Qualifier
+ */
+class KeyFamilyQualifier(val rowKey:Array[Byte], val family:Array[Byte], val 
qualifier:Array[Byte])
+  extends Comparable[KeyFamilyQualifier] with Serializable {
+  override def compareTo(o: KeyFamilyQualifier): Int = {
+    var result = Bytes.compareTo(rowKey, o.rowKey)
+    if (result == 0) {
+      result = Bytes.compareTo(family, o.family)
+      if (result == 0) result = Bytes.compareTo(qualifier, o.qualifier)
+    }
+    result
+  }
+  override def toString: String = {
+    Bytes.toString(rowKey) + ":" + Bytes.toString(family) + ":" + 
Bytes.toString(qualifier)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
new file mode 100644
index 0000000..6e89466
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.s2jobs
+
+class S2GraphHelperTest {
+
+}

Reply via email to