http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/spark/DefaultSource.scala 
b/loader/src/main/scala/spark/DefaultSource.scala
deleted file mode 100644
index cc36a15..0000000
--- a/loader/src/main/scala/spark/DefaultSource.scala
+++ /dev/null
@@ -1,982 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements.  See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License.  You may obtain a copy of the License at
-// *
-// *    http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package spark
-//
-//import java.util
-//import java.util.concurrent.ConcurrentLinkedQueue
-//
-//import org.apache.hadoop.hbase.client.{ConnectionFactory, Get, Result, Scan}
-//import org.apache.hadoop.hbase.types._
-//import org.apache.hadoop.hbase.util.{SimplePositionedMutableByteRange, 
PositionedByteRange, Bytes}
-//import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
-//import org.apache.spark.Logging
-//import org.apache.spark.rdd.RDD
-//import org.apache.spark.sql.types.DataType
-//import org.apache.spark.sql.{Row, SQLContext}
-//import org.apache.spark.sql.sources._
-//import org.apache.spark.sql.types._
-//
-//import scala.collection.mutable
-//
-///**
-// * DefaultSource for integration with Spark's dataframe datasources.
-// * This class will produce a relationProvider based on input given to it 
from spark
-// *
-// * In all this DefaultSource support the following datasource functionality
-// * - Scan range pruning through filter push down logic based on rowKeys
-// * - Filter push down logic on HBase Cells
-// * - Qualifier filtering based on columns used in the SparkSQL statement
-// * - Type conversions of basic SQL types.  All conversions will be
-// *   Through the HBase Bytes object commands.
-// */
-//class DefaultSource extends RelationProvider {
-//
-//  val TABLE_KEY:String = "hbase.table"
-//  val SCHEMA_COLUMNS_MAPPING_KEY:String = "hbase.columns.mapping"
-//  val BATCHING_NUM_KEY:String = "hbase.batching.num"
-//  val CACHING_NUM_KEY:String = "hbase.caching.num"
-//  val HBASE_CONFIG_RESOURCES_LOCATIONS:String = "hbase.config.resources"
-//  val USE_HBASE_CONTEXT:String = "hbase.use.hbase.context"
-//
-//  /**
-//   * Is given input from SparkSQL to construct a BaseRelation
-//   * @param sqlContext SparkSQL context
-//   * @param parameters Parameters given to us from SparkSQL
-//   * @return           A BaseRelation Object
-//   */
-//  override def createRelation(sqlContext: SQLContext,
-//                              parameters: Map[String, String]):
-//  BaseRelation = {
-//
-//
-//    val tableName = parameters.get(TABLE_KEY)
-//    if (tableName.isEmpty)
-//      new IllegalArgumentException("Invalid value for " + TABLE_KEY +" '" + 
tableName + "'")
-//
-//    val schemaMappingString = 
parameters.getOrElse(SCHEMA_COLUMNS_MAPPING_KEY, "")
-//    val batchingNumStr = parameters.getOrElse(BATCHING_NUM_KEY, "1000")
-//    val cachingNumStr = parameters.getOrElse(CACHING_NUM_KEY, "1000")
-//    val hbaseConfigResources = 
parameters.getOrElse(HBASE_CONFIG_RESOURCES_LOCATIONS, "")
-//    val useHBaseReources = parameters.getOrElse(USE_HBASE_CONTEXT, "true")
-//
-//    val batchingNum:Int = try {
-//      batchingNumStr.toInt
-//    } catch {
-//      case e:NumberFormatException => throw
-//        new IllegalArgumentException("Invalid value for " + BATCHING_NUM_KEY 
+" '"
-//            + batchingNumStr + "'", e)
-//    }
-//
-//    val cachingNum:Int = try {
-//      cachingNumStr.toInt
-//    } catch {
-//      case e:NumberFormatException => throw
-//        new IllegalArgumentException("Invalid value for " + CACHING_NUM_KEY 
+" '"
-//            + cachingNumStr + "'", e)
-//    }
-//
-//    new HBaseRelation(tableName.get,
-//      generateSchemaMappingMap(schemaMappingString),
-//      batchingNum.toInt,
-//      cachingNum.toInt,
-//      hbaseConfigResources,
-//      useHBaseReources.equalsIgnoreCase("true"))(sqlContext)
-//  }
-//
-//  /**
-//   * Reads the SCHEMA_COLUMNS_MAPPING_KEY and converts it to a map of
-//   * SchemaQualifierDefinitions with the original sql column name as the key
-//   * @param schemaMappingString The schema mapping string from the SparkSQL 
map
-//   * @return                    A map of definitions keyed by the SparkSQL 
column name
-//   */
-//  def generateSchemaMappingMap(schemaMappingString:String):
-//  java.util.HashMap[String, SchemaQualifierDefinition] = {
-//    try {
-//      val columnDefinitions = schemaMappingString.split(',')
-//      val resultingMap = new java.util.HashMap[String, 
SchemaQualifierDefinition]()
-//      columnDefinitions.map(cd => {
-//        val parts = cd.trim.split(' ')
-//
-//        //Make sure we get three parts
-//        //<ColumnName> <ColumnType> <ColumnFamily:Qualifier>
-//        if (parts.length == 3) {
-//          val hbaseDefinitionParts = if (parts(2).charAt(0) == ':') {
-//            Array[String]("", "key")
-//          } else {
-//            parts(2).split(':')
-//          }
-//          resultingMap.put(parts(0), new SchemaQualifierDefinition(parts(0),
-//            parts(1), hbaseDefinitionParts(0), hbaseDefinitionParts(1)))
-//        } else {
-//          throw new IllegalArgumentException("Invalid value for schema 
mapping '" + cd +
-//            "' should be '<columnName> <columnType> 
<columnFamily>:<qualifier>' " +
-//            "for columns and '<columnName> <columnType> :<qualifier>' for 
rowKeys")
-//        }
-//      })
-//      resultingMap
-//    } catch {
-//      case e:Exception => throw
-//        new IllegalArgumentException("Invalid value for " + 
SCHEMA_COLUMNS_MAPPING_KEY +
-//          " '" + schemaMappingString + "'", e )
-//    }
-//  }
-//}
-//
-///**
-// * Implementation of Spark BaseRelation that will build up our scan logic
-// * , do the scan pruning, filter push down, and value conversions
-// *
-// * @param tableName               HBase table that we plan to read from
-// * @param schemaMappingDefinition SchemaMapping information to map HBase
-// *                                Qualifiers to SparkSQL columns
-// * @param batchingNum             The batching number to be applied to the
-// *                                scan object
-// * @param cachingNum              The caching number to be applied to the
-// *                                scan object
-// * @param configResources         Optional comma separated list of config 
resources
-// *                                to get based on their URI
-// * @param useHBaseContext         If true this will look to see if
-// *                                HBaseContext.latest is populated to use 
that
-// *                                connection information
-// * @param sqlContext              SparkSQL context
-// */
-//class HBaseRelation (val tableName:String,
-//                     val schemaMappingDefinition:
-//                     java.util.HashMap[String, SchemaQualifierDefinition],
-//                     val batchingNum:Int,
-//                     val cachingNum:Int,
-//                     val configResources:String,
-//                     val useHBaseContext:Boolean) (
-//  @transient val sqlContext:SQLContext)
-//  extends BaseRelation with PrunedFilteredScan with Logging {
-//
-//  //create or get latest HBaseContext
-//  @transient val hbaseContext:HBaseContext = if (useHBaseContext) {
-//    LatestHBaseContextCache.latest
-//  } else {
-//    val config = HBaseConfiguration.create()
-//    configResources.split(",").foreach( r => config.addResource(r))
-//    new HBaseContext(sqlContext.sparkContext, config)
-//  }
-//
-//  /**
-//   * Generates a Spark SQL schema object so Spark SQL knows what is being
-//   * provided by this BaseRelation
-//   *
-//   * @return schema generated from the SCHEMA_COLUMNS_MAPPING_KEY value
-//   */
-//  override def schema: StructType = {
-//
-//    val metadataBuilder = new MetadataBuilder()
-//
-//    val structFieldArray = new 
Array[StructField](schemaMappingDefinition.size())
-//
-//    val schemaMappingDefinitionIt = 
schemaMappingDefinition.values().iterator()
-//    var indexCounter = 0
-//    while (schemaMappingDefinitionIt.hasNext) {
-//      val c = schemaMappingDefinitionIt.next()
-//
-//      val metadata = metadataBuilder.putString("name", c.columnName).build()
-//      val structField =
-//        new StructField(c.columnName, c.columnSparkSqlType, nullable = true, 
metadata)
-//
-//      structFieldArray(indexCounter) = structField
-//      indexCounter += 1
-//    }
-//
-//    val result = new StructType(structFieldArray)
-//    result
-//  }
-//
-//  /**
-//   * Here we are building the functionality to populate the resulting 
RDD[Row]
-//   * Here is where we will do the following:
-//   * - Filter push down
-//   * - Scan or GetList pruning
-//   * - Executing our scan(s) or/and GetList to generate result
-//   *
-//   * @param requiredColumns The columns that are being requested by the 
requesting query
-//   * @param filters         The filters that are being applied by the 
requesting query
-//   * @return                RDD will all the results from HBase needed for 
SparkSQL to
-//   *                        execute the query on
-//   */
-//  override def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
-//
-//    val columnFilterCollection = buildColumnFilterCollection(filters)
-//
-//    val requiredQualifierDefinitionArray = new 
mutable.MutableList[SchemaQualifierDefinition]
-//    requiredColumns.foreach( c => {
-//      val definition = schemaMappingDefinition.get(c)
-//      if (definition.columnFamilyBytes.length > 0) {
-//        requiredQualifierDefinitionArray += definition
-//      }
-//    })
-//
-//    //Create a local variable so that scala doesn't have to
-//    // serialize the whole HBaseRelation Object
-//    val serializableDefinitionMap = schemaMappingDefinition
-//
-//
-//    //retain the information for unit testing checks
-//    
DefaultSourceStaticUtils.populateLatestExecutionRules(columnFilterCollection,
-//      requiredQualifierDefinitionArray)
-//    var resultRDD: RDD[Row] = null
-//
-//    if (columnFilterCollection != null) {
-//      val pushDownFilterJava =
-//        new SparkSQLPushDownFilter(
-//          
columnFilterCollection.generateFamilyQualifiterFilterMap(schemaMappingDefinition))
-//
-//      val getList = new util.ArrayList[Get]()
-//      val rddList = new util.ArrayList[RDD[Row]]()
-//
-//      val it = columnFilterCollection.columnFilterMap.iterator
-//      while (it.hasNext) {
-//        val e = it.next()
-//        val columnDefinition = schemaMappingDefinition.get(e._1)
-//        //check is a rowKey
-//        if (columnDefinition != null && 
columnDefinition.columnFamily.isEmpty) {
-//          //add points to getList
-//          e._2.points.foreach(p => {
-//            val get = new Get(p)
-//            requiredQualifierDefinitionArray.foreach( d =>
-//              get.addColumn(d.columnFamilyBytes, d.qualifierBytes))
-//            getList.add(get)
-//          })
-//
-//          val rangeIt = e._2.ranges.iterator
-//
-//          while (rangeIt.hasNext) {
-//            val r = rangeIt.next()
-//
-//            val scan = new Scan()
-//            scan.setBatch(batchingNum)
-//            scan.setCaching(cachingNum)
-//            requiredQualifierDefinitionArray.foreach( d =>
-//              scan.addColumn(d.columnFamilyBytes, d.qualifierBytes))
-//
-//            if (pushDownFilterJava.columnFamilyQualifierFilterMap.size() > 
0) {
-//              scan.setFilter(pushDownFilterJava)
-//            }
-//
-//            //Check if there is a lower bound
-//            if (r.lowerBound != null && r.lowerBound.length > 0) {
-//
-//              if (r.isLowerBoundEqualTo) {
-//                //HBase startRow is inclusive: Therefore it acts like  
isLowerBoundEqualTo
-//                // by default
-//                scan.setStartRow(r.lowerBound)
-//              } else {
-//                //Since we don't equalTo we want the next value we need
-//                // to add another byte to the start key.  That new byte will 
be
-//                // the min byte value.
-//                val newArray = new Array[Byte](r.lowerBound.length + 1)
-//                System.arraycopy(r.lowerBound, 0, newArray, 0, 
r.lowerBound.length)
-//
-//                //new Min Byte
-//                newArray(r.lowerBound.length) = Byte.MinValue
-//                scan.setStartRow(newArray)
-//              }
-//            }
-//
-//            //Check if there is a upperBound
-//            if (r.upperBound != null && r.upperBound.length > 0) {
-//              if (r.isUpperBoundEqualTo) {
-//                //HBase stopRow is exclusive: therefore it DOESN'T ast like 
isUpperBoundEqualTo
-//                // by default.  So we need to add a new max byte to the 
stopRow key
-//                val newArray = new Array[Byte](r.upperBound.length + 1)
-//                System.arraycopy(r.upperBound, 0, newArray, 0, 
r.upperBound.length)
-//
-//                //New Max Bytes
-//                newArray(r.upperBound.length) = Byte.MaxValue
-//
-//                scan.setStopRow(newArray)
-//              } else {
-//                //Here equalTo is false for Upper bound which is exclusive 
and
-//                // HBase stopRow acts like that by default so no need to 
mutate the
-//                // rowKey
-//                scan.setStopRow(r.upperBound)
-//              }
-//            }
-//
-//            val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), 
scan).map(r => {
-//              Row.fromSeq(requiredColumns.map(c =>
-//                DefaultSourceStaticUtils.getValue(c, 
serializableDefinitionMap, r._2)))
-//            })
-//            rddList.add(rdd)
-//          }
-//        }
-//      }
-//
-//      //If there is more then one RDD then we have to union them together
-//      for (i <- 0 until rddList.size()) {
-//        if (resultRDD == null) resultRDD = rddList.get(i)
-//        else resultRDD = resultRDD.union(rddList.get(i))
-//
-//      }
-//
-//      //If there are gets then we can get them from the driver and union 
that rdd in
-//      // with the rest of the values.
-//      if (getList.size() > 0) {
-//        val connection = 
ConnectionFactory.createConnection(hbaseContext.tmpHdfsConfiguration)
-//        try {
-//          val table = connection.getTable(TableName.valueOf(tableName))
-//          try {
-//            val results = table.get(getList)
-//            val rowList = mutable.MutableList[Row]()
-//            for (i <- 0 until results.length) {
-//              val rowArray = requiredColumns.map(c =>
-//                DefaultSourceStaticUtils.getValue(c, 
schemaMappingDefinition, results(i)))
-//              rowList += Row.fromSeq(rowArray)
-//            }
-//            val getRDD = sqlContext.sparkContext.parallelize(rowList)
-//            if (resultRDD == null) resultRDD = getRDD
-//            else {
-//              resultRDD = resultRDD.union(getRDD)
-//            }
-//          } finally {
-//            table.close()
-//          }
-//        } finally {
-//          connection.close()
-//        }
-//      }
-//    }
-//    if (resultRDD == null) {
-//      val scan = new Scan()
-//      scan.setBatch(batchingNum)
-//      scan.setCaching(cachingNum)
-//      requiredQualifierDefinitionArray.foreach( d =>
-//        scan.addColumn(d.columnFamilyBytes, d.qualifierBytes))
-//
-//      val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), 
scan).map(r => {
-//        Row.fromSeq(requiredColumns.map(c => 
DefaultSourceStaticUtils.getValue(c,
-//          serializableDefinitionMap, r._2)))
-//      })
-//      resultRDD=rdd
-//    }
-//    resultRDD
-//  }
-//
-//  /**
-//   * Root recursive function that will loop over the filters provided by
-//   * SparkSQL.  Some filters are AND or OR functions and contain additional 
filters
-//   * hence the need for recursion.
-//   *
-//   * @param filters Filters provided by SparkSQL.
-//   *                Filters are joined with the AND operater
-//   * @return        A ColumnFilterCollection whish is a consolidated 
construct to
-//   *                hold the high level filter information
-//   */
-//  def buildColumnFilterCollection(filters: Array[Filter]): 
ColumnFilterCollection = {
-//    var superCollection: ColumnFilterCollection = null
-//
-//    filters.foreach( f => {
-//      val parentCollection = new ColumnFilterCollection
-//      buildColumnFilterCollection(parentCollection, f)
-//      if (superCollection == null)
-//        superCollection = parentCollection
-//      else
-//        superCollection.mergeIntersect(parentCollection)
-//    })
-//    superCollection
-//  }
-//
-//  /**
-//   * Recursive function that will work to convert Spark Filter
-//   * objects to ColumnFilterCollection
-//   *
-//   * @param parentFilterCollection Parent ColumnFilterCollection
-//   * @param filter                 Current given filter from SparkSQL
-//   */
-//  def 
buildColumnFilterCollection(parentFilterCollection:ColumnFilterCollection,
-//                                  filter:Filter): Unit = {
-//    filter match {
-//
-//      case EqualTo(attr, value) =>
-//        parentFilterCollection.mergeUnion(attr,
-//          new ColumnFilter(DefaultSourceStaticUtils.getByteValue(attr,
-//            schemaMappingDefinition, value.toString)))
-//
-//      case LessThan(attr, value) =>
-//        parentFilterCollection.mergeUnion(attr, new ColumnFilter(null,
-//          new ScanRange(DefaultSourceStaticUtils.getByteValue(attr,
-//            schemaMappingDefinition, value.toString), false,
-//            new Array[Byte](0), true)))
-//
-//      case GreaterThan(attr, value) =>
-//        parentFilterCollection.mergeUnion(attr, new ColumnFilter(null,
-//        new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr,
-//          schemaMappingDefinition, value.toString), false)))
-//
-//      case LessThanOrEqual(attr, value) =>
-//        parentFilterCollection.mergeUnion(attr, new ColumnFilter(null,
-//        new ScanRange(DefaultSourceStaticUtils.getByteValue(attr,
-//          schemaMappingDefinition, value.toString), true,
-//          new Array[Byte](0), true)))
-//
-//      case GreaterThanOrEqual(attr, value) =>
-//        parentFilterCollection.mergeUnion(attr, new ColumnFilter(null,
-//        new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr,
-//          schemaMappingDefinition, value.toString), true)))
-//
-//      case Or(left, right) =>
-//        buildColumnFilterCollection(parentFilterCollection, left)
-//        val rightSideCollection = new ColumnFilterCollection
-//        buildColumnFilterCollection(rightSideCollection, right)
-//        parentFilterCollection.mergeUnion(rightSideCollection)
-//      case And(left, right) =>
-//        buildColumnFilterCollection(parentFilterCollection, left)
-//        val rightSideCollection = new ColumnFilterCollection
-//        buildColumnFilterCollection(rightSideCollection, right)
-//        parentFilterCollection.mergeIntersect(rightSideCollection)
-//      case _ => //nothing
-//    }
-//  }
-//}
-//
-///**
-// * Construct to contains column data that spend SparkSQL and HBase
-// *
-// * @param columnName   SparkSQL column name
-// * @param colType      SparkSQL column type
-// * @param columnFamily HBase column family
-// * @param qualifier    HBase qualifier name
-// */
-//case class SchemaQualifierDefinition(columnName:String,
-//                          colType:String,
-//                          columnFamily:String,
-//                          qualifier:String) extends Serializable {
-//  val columnFamilyBytes = Bytes.toBytes(columnFamily)
-//  val qualifierBytes = Bytes.toBytes(qualifier)
-//  val columnSparkSqlType:DataType = if (colType.equals("BOOLEAN")) 
BooleanType
-//    else if (colType.equals("TINYINT")) IntegerType
-//    else if (colType.equals("INT")) IntegerType
-//    else if (colType.equals("BIGINT")) LongType
-//    else if (colType.equals("FLOAT")) FloatType
-//    else if (colType.equals("DOUBLE")) DoubleType
-//    else if (colType.equals("STRING")) StringType
-//    else if (colType.equals("TIMESTAMP")) TimestampType
-//    else if (colType.equals("DECIMAL")) StringType 
//DataTypes.createDecimalType(precision, scale)
-//    else throw new IllegalArgumentException("Unsupported column type :" + 
colType)
-//}
-//
-///**
-// * Construct to contain a single scan ranges information.  Also
-// * provide functions to merge with other scan ranges through AND
-// * or OR operators
-// *
-// * @param upperBound          Upper bound of scan
-// * @param isUpperBoundEqualTo Include upper bound value in the results
-// * @param lowerBound          Lower bound of scan
-// * @param isLowerBoundEqualTo Include lower bound value in the results
-// */
-//class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean,
-//                var lowerBound:Array[Byte], var isLowerBoundEqualTo:Boolean)
-//  extends Serializable {
-//
-//  /**
-//   * Function to merge another scan object through a AND operation
-//   * @param other Other scan object
-//   */
-//  def mergeIntersect(other:ScanRange): Unit = {
-//    val upperBoundCompare = compareRange(upperBound, other.upperBound)
-//    val lowerBoundCompare = compareRange(lowerBound, other.lowerBound)
-//
-//    upperBound = if (upperBoundCompare <0) upperBound else other.upperBound
-//    lowerBound = if (lowerBoundCompare >0) lowerBound else other.lowerBound
-//
-//    isLowerBoundEqualTo = if (lowerBoundCompare == 0)
-//      isLowerBoundEqualTo && other.isLowerBoundEqualTo
-//    else isLowerBoundEqualTo
-//
-//    isUpperBoundEqualTo = if (upperBoundCompare == 0)
-//      isUpperBoundEqualTo && other.isUpperBoundEqualTo
-//    else isUpperBoundEqualTo
-//  }
-//
-//  /**
-//   * Function to merge another scan object through a OR operation
-//   * @param other Other scan object
-//   */
-//  def mergeUnion(other:ScanRange): Unit = {
-//
-//    val upperBoundCompare = compareRange(upperBound, other.upperBound)
-//    val lowerBoundCompare = compareRange(lowerBound, other.lowerBound)
-//
-//    upperBound = if (upperBoundCompare >0) upperBound else other.upperBound
-//    lowerBound = if (lowerBoundCompare <0) lowerBound else other.lowerBound
-//
-//    isLowerBoundEqualTo = if (lowerBoundCompare == 0)
-//      isLowerBoundEqualTo || other.isLowerBoundEqualTo
-//    else isLowerBoundEqualTo
-//
-//    isUpperBoundEqualTo = if (upperBoundCompare == 0)
-//      isUpperBoundEqualTo || other.isUpperBoundEqualTo
-//    else isUpperBoundEqualTo
-//  }
-//
-//  /**
-//   * Common function to see if this scan over laps with another
-//   *
-//   * Reference Visual
-//   *
-//   * A                           B
-//   * |---------------------------|
-//   *   LL--------------LU
-//   *        RL--------------RU
-//   *
-//   * A = lowest value is byte[0]
-//   * B = highest value is null
-//   * LL = Left Lower Bound
-//   * LU = Left Upper Bound
-//   * RL = Right Lower Bound
-//   * RU = Right Upper Bound
-//   *
-//   * @param other Other scan object
-//   * @return      True is overlap false is not overlap
-//   */
-//  def doesOverLap(other:ScanRange): Boolean = {
-//
-//    var leftRange:ScanRange = null
-//    var rightRange:ScanRange = null
-//
-//    //First identify the Left range
-//    // Also lower bound can't be null
-//    if (Bytes.compareTo(lowerBound, other.lowerBound) <=0) {
-//      leftRange = this
-//      rightRange = other
-//    } else {
-//      leftRange = other
-//      rightRange = this
-//    }
-//
-//    //Then see if leftRange goes to null or if leftRange.upperBound
-//    // upper is greater or equals to rightRange.lowerBound
-//    leftRange.upperBound == null ||
-//      Bytes.compareTo(leftRange.upperBound, rightRange.lowerBound) >= 0
-//  }
-//
-//  /**
-//   * Special compare logic because we can have null values
-//   * for left or right bound
-//   *
-//   * @param left  Left byte array
-//   * @param right Right byte array
-//   * @return      0 for equals 1 is left is greater and -1 is right is 
greater
-//   */
-//  def compareRange(left:Array[Byte], right:Array[Byte]): Int = {
-//    if (left == null && right == null) 0
-//    else if (left == null && right != null) 1
-//    else if (left != null && right == null) -1
-//    else Bytes.compareTo(left, right)
-//  }
-//  override def toString:String = {
-//    "ScanRange:(" + Bytes.toString(upperBound) + "," + isUpperBoundEqualTo + 
"," +
-//      Bytes.toString(lowerBound) + "," + isLowerBoundEqualTo + ")"
-//  }
-//}
-//
-///**
-// * Contains information related to a filters for a given column.
-// * This can contain many ranges or points.
-// *
-// * @param currentPoint the initial point when the filter is created
-// * @param currentRange the initial scanRange when the filter is created
-// */
-//class ColumnFilter (currentPoint:Array[Byte] = null,
-//                     currentRange:ScanRange = null,
-//                     var points:mutable.MutableList[Array[Byte]] =
-//                     new mutable.MutableList[Array[Byte]](),
-//                     var ranges:mutable.MutableList[ScanRange] =
-//                     new mutable.MutableList[ScanRange]() ) extends 
Serializable {
-//  //Collection of ranges
-//  if (currentRange != null ) ranges.+=(currentRange)
-//
-//  //Collection of points
-//  if (currentPoint != null) points.+=(currentPoint)
-//
-//  /**
-//   * This will validate a give value through the filter's points and/or 
ranges
-//   * the result will be if the value passed the filter
-//   *
-//   * @param value       Value to be validated
-//   * @param valueOffSet The offset of the value
-//   * @param valueLength The length of the value
-//   * @return            True is the value passes the filter false if not
-//   */
-//  def validate(value:Array[Byte], valueOffSet:Int, valueLength:Int):Boolean 
= {
-//    var result = false
-//
-//    points.foreach( p => {
-//      if (Bytes.equals(p, 0, p.length, value, valueOffSet, valueLength)) {
-//        result = true
-//      }
-//    })
-//
-//    ranges.foreach( r => {
-//      val upperBoundPass = r.upperBound == null ||
-//        (r.isUpperBoundEqualTo &&
-//          Bytes.compareTo(r.upperBound, 0, r.upperBound.length,
-//            value, valueOffSet, valueLength) >= 0) ||
-//        (!r.isUpperBoundEqualTo &&
-//          Bytes.compareTo(r.upperBound, 0, r.upperBound.length,
-//            value, valueOffSet, valueLength) > 0)
-//
-//      val lowerBoundPass = r.lowerBound == null || r.lowerBound.length == 0
-//        (r.isLowerBoundEqualTo &&
-//          Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length,
-//            value, valueOffSet, valueLength) <= 0) ||
-//        (!r.isLowerBoundEqualTo &&
-//          Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length,
-//            value, valueOffSet, valueLength) < 0)
-//
-//      result = result || (upperBoundPass && lowerBoundPass)
-//    })
-//    result
-//  }
-//
-//  /**
-//   * This will allow us to merge filter logic that is joined to the existing 
filter
-//   * through a OR operator
-//   *
-//   * @param other Filter to merge
-//   */
-//  def mergeUnion(other:ColumnFilter): Unit = {
-//    other.points.foreach( p => points += p)
-//
-//    other.ranges.foreach( otherR => {
-//      var doesOverLap = false
-//      ranges.foreach{ r =>
-//        if (r.doesOverLap(otherR)) {
-//          r.mergeUnion(otherR)
-//          doesOverLap = true
-//        }}
-//      if (!doesOverLap) ranges.+=(otherR)
-//    })
-//  }
-//
-//  /**
-//   * This will allow us to merge filter logic that is joined to the existing 
filter
-//   * through a AND operator
-//   *
-//   * @param other Filter to merge
-//   */
-//  def mergeIntersect(other:ColumnFilter): Unit = {
-//    val survivingPoints = new mutable.MutableList[Array[Byte]]()
-//    points.foreach( p => {
-//      other.points.foreach( otherP => {
-//        if (Bytes.equals(p, otherP)) {
-//          survivingPoints.+=(p)
-//        }
-//      })
-//    })
-//    points = survivingPoints
-//
-//    val survivingRanges = new mutable.MutableList[ScanRange]()
-//
-//    other.ranges.foreach( otherR => {
-//      ranges.foreach( r => {
-//        if (r.doesOverLap(otherR)) {
-//          r.mergeIntersect(otherR)
-//          survivingRanges += r
-//        }
-//      })
-//    })
-//    ranges = survivingRanges
-//  }
-//
-//  override def toString:String = {
-//    val strBuilder = new StringBuilder
-//    strBuilder.append("(points:(")
-//    var isFirst = true
-//    points.foreach( p => {
-//      if (isFirst) isFirst = false
-//      else strBuilder.append(",")
-//      strBuilder.append(Bytes.toString(p))
-//    })
-//    strBuilder.append("),ranges:")
-//    isFirst = true
-//    ranges.foreach( r => {
-//      if (isFirst) isFirst = false
-//      else strBuilder.append(",")
-//      strBuilder.append(r)
-//    })
-//    strBuilder.append("))")
-//    strBuilder.toString()
-//  }
-//}
-//
-///**
-// * A collection of ColumnFilters indexed by column names.
-// *
-// * Also contains merge commends that will consolidate the filters
-// * per column name
-// */
-//class ColumnFilterCollection {
-//  val columnFilterMap = new mutable.HashMap[String, ColumnFilter]
-//
-//  def clear(): Unit = {
-//    columnFilterMap.clear()
-//  }
-//
-//  /**
-//   * This will allow us to merge filter logic that is joined to the existing 
filter
-//   * through a OR operator.  This will merge a single columns filter
-//   *
-//   * @param column The column to be merged
-//   * @param other  The other ColumnFilter object to merge
-//   */
-//  def mergeUnion(column:String, other:ColumnFilter): Unit = {
-//    val existingFilter = columnFilterMap.get(column)
-//    if (existingFilter.isEmpty) {
-//      columnFilterMap.+=((column, other))
-//    } else {
-//      existingFilter.get.mergeUnion(other)
-//    }
-//  }
-//
-//  /**
-//   * This will allow us to merge all filters in the existing collection
-//   * to the filters in the other collection.  All merges are done as a result
-//   * of a OR operator
-//   *
-//   * @param other The other Column Filter Collection to be merged
-//   */
-//  def mergeUnion(other:ColumnFilterCollection): Unit = {
-//    other.columnFilterMap.foreach( e => {
-//      mergeUnion(e._1, e._2)
-//    })
-//  }
-//
-//  /**
-//   * This will allow us to merge all filters in the existing collection
-//   * to the filters in the other collection.  All merges are done as a result
-//   * of a AND operator
-//   *
-//   * @param other The column filter from the other collection
-//   */
-//  def mergeIntersect(other:ColumnFilterCollection): Unit = {
-//    other.columnFilterMap.foreach( e => {
-//      val existingColumnFilter = columnFilterMap.get(e._1)
-//      if (existingColumnFilter.isEmpty) {
-//        columnFilterMap += e
-//      } else {
-//        existingColumnFilter.get.mergeIntersect(e._2)
-//      }
-//    })
-//  }
-//
-//  /**
-//   * This will collect all the filter information in a way that is optimized
-//   * for the HBase filter commend.  Allowing the filter to be accessed
-//   * with columnFamily and qualifier information
-//   *
-//   * @param schemaDefinitionMap Schema Map that will help us map the right 
filters
-//   *                            to the correct columns
-//   * @return                    HashMap oc column filters
-//   */
-//  def generateFamilyQualifiterFilterMap(schemaDefinitionMap:
-//                                        java.util.HashMap[String,
-//                                          SchemaQualifierDefinition]):
-//  util.HashMap[ColumnFamilyQualifierMapKeyWrapper, ColumnFilter] = {
-//    val familyQualifierFilterMap =
-//      new util.HashMap[ColumnFamilyQualifierMapKeyWrapper, ColumnFilter]()
-//
-//    columnFilterMap.foreach( e => {
-//      val definition = schemaDefinitionMap.get(e._1)
-//      //Don't add rowKeyFilter
-//      if (definition.columnFamilyBytes.size > 0) {
-//        familyQualifierFilterMap.put(
-//          new ColumnFamilyQualifierMapKeyWrapper(
-//            definition.columnFamilyBytes, 0, 
definition.columnFamilyBytes.length,
-//            definition.qualifierBytes, 0, definition.qualifierBytes.length), 
e._2)
-//      }
-//    })
-//    familyQualifierFilterMap
-//  }
-//
-//  override def toString:String = {
-//    val strBuilder = new StringBuilder
-//    columnFilterMap.foreach( e => strBuilder.append(e))
-//    strBuilder.toString()
-//  }
-//}
-//
-///**
-// * Status object to store static functions but also to hold last executed
-// * information that can be used for unit testing.
-// */
-//object DefaultSourceStaticUtils {
-//
-//  val rawInteger = new RawInteger
-//  val rawLong = new RawLong
-//  val rawFloat = new RawFloat
-//  val rawDouble = new RawDouble
-//  val rawString = RawString.ASCENDING
-//
-//  val byteRange = new ThreadLocal[PositionedByteRange]{
-//    override def initialValue(): PositionedByteRange = {
-//      val range = new SimplePositionedMutableByteRange()
-//      range.setOffset(0)
-//      range.setPosition(0)
-//    }
-//  }
-//
-//  def getFreshByteRange(bytes:Array[Byte]): PositionedByteRange = {
-//    getFreshByteRange(bytes, 0, bytes.length)
-//  }
-//
-//  def getFreshByteRange(bytes:Array[Byte],  offset:Int = 0, length:Int): 
PositionedByteRange = {
-//    byteRange.get().set(bytes).setLength(length).setOffset(offset)
-//  }
-//
-//  //This will contain the last 5 filters and required fields used in 
buildScan
-//  // These values can be used in unit testing to make sure we are converting
-//  // The Spark SQL input correctly
-//  val lastFiveExecutionRules =
-//    new ConcurrentLinkedQueue[ExecutionRuleForUnitTesting]()
-//
-//  /**
-//   * This method is to populate the lastFiveExecutionRules for unit test 
perposes
-//   * This method is not thread safe.
-//   *
-//   * @param columnFilterCollection           The filters in the last job
-//   * @param requiredQualifierDefinitionArray The required columns in the 
last job
-//   */
-//  def populateLatestExecutionRules(columnFilterCollection: 
ColumnFilterCollection,
-//                                   requiredQualifierDefinitionArray:
-//                                   
mutable.MutableList[SchemaQualifierDefinition]):Unit = {
-//    lastFiveExecutionRules.add(new ExecutionRuleForUnitTesting(
-//      columnFilterCollection, requiredQualifierDefinitionArray))
-//    while (lastFiveExecutionRules.size() > 5) {
-//      lastFiveExecutionRules.poll()
-//    }
-//  }
-//
-//  /**
-//   * This method will convert the result content from HBase into the
-//   * SQL value type that is requested by the Spark SQL schema definition
-//   *
-//   * @param columnName              The name of the SparkSQL Column
-//   * @param schemaMappingDefinition The schema definition map
-//   * @param r                       The result object from HBase
-//   * @return                        The converted object type
-//   */
-//  def getValue(columnName: String,
-//               schemaMappingDefinition:
-//               java.util.HashMap[String, SchemaQualifierDefinition],
-//               r: Result): Any = {
-//
-//    val columnDef = schemaMappingDefinition.get(columnName)
-//
-//    if (columnDef == null) throw new IllegalArgumentException("Unknown 
column:" + columnName)
-//
-//
-//    if (columnDef.columnFamilyBytes.isEmpty) {
-//      val row = r.getRow
-//
-//      columnDef.columnSparkSqlType match {
-//        case IntegerType => rawInteger.decode(getFreshByteRange(row))
-//        case LongType => rawLong.decode(getFreshByteRange(row))
-//        case FloatType => rawFloat.decode(getFreshByteRange(row))
-//        case DoubleType => rawDouble.decode(getFreshByteRange(row))
-//        case StringType => rawString.decode(getFreshByteRange(row))
-//        case TimestampType => rawLong.decode(getFreshByteRange(row))
-//        case _ => Bytes.toString(row)
-//      }
-//    } else {
-//      val cellByteValue =
-//        r.getColumnLatestCell(columnDef.columnFamilyBytes, 
columnDef.qualifierBytes)
-//      if (cellByteValue == null) null
-//      else columnDef.columnSparkSqlType match {
-//        case IntegerType => 
rawInteger.decode(getFreshByteRange(cellByteValue.getValueArray,
-//          cellByteValue.getValueOffset, cellByteValue.getValueLength))
-//        case LongType => 
rawLong.decode(getFreshByteRange(cellByteValue.getValueArray,
-//          cellByteValue.getValueOffset, cellByteValue.getValueLength))
-//        case FloatType => 
rawFloat.decode(getFreshByteRange(cellByteValue.getValueArray,
-//          cellByteValue.getValueOffset, cellByteValue.getValueLength))
-//        case DoubleType => 
rawDouble.decode(getFreshByteRange(cellByteValue.getValueArray,
-//          cellByteValue.getValueOffset, cellByteValue.getValueLength))
-//        case StringType => Bytes.toString(cellByteValue.getValueArray,
-//          cellByteValue.getValueOffset, cellByteValue.getValueLength)
-//        case TimestampType => 
rawLong.decode(getFreshByteRange(cellByteValue.getValueArray,
-//          cellByteValue.getValueOffset, cellByteValue.getValueLength))
-//        case _ => Bytes.toString(cellByteValue.getValueArray,
-//          cellByteValue.getValueOffset, cellByteValue.getValueLength)
-//      }
-//    }
-//  }
-//
-//  /**
-//   * This will convert the value from SparkSQL to be stored into HBase using 
the
-//   * right byte Type
-//   *
-//   * @param columnName              SparkSQL column name
-//   * @param schemaMappingDefinition Schema definition map
-//   * @param value                   String value from SparkSQL
-//   * @return                        Returns the byte array to go into HBase
-//   */
-//  def getByteValue(columnName: String,
-//                   schemaMappingDefinition:
-//                   java.util.HashMap[String, SchemaQualifierDefinition],
-//                   value: String): Array[Byte] = {
-//
-//    val columnDef = schemaMappingDefinition.get(columnName)
-//
-//    if (columnDef == null) {
-//      throw new IllegalArgumentException("Unknown column:" + columnName)
-//    } else {
-//      columnDef.columnSparkSqlType match {
-//        case IntegerType =>
-//          val result = new Array[Byte](Bytes.SIZEOF_INT)
-//          val localDataRange = getFreshByteRange(result)
-//          rawInteger.encode(localDataRange, value.toInt)
-//          localDataRange.getBytes
-//        case LongType =>
-//          val result = new Array[Byte](Bytes.SIZEOF_LONG)
-//          val localDataRange = getFreshByteRange(result)
-//          rawLong.encode(localDataRange, value.toLong)
-//          localDataRange.getBytes
-//        case FloatType =>
-//          val result = new Array[Byte](Bytes.SIZEOF_FLOAT)
-//          val localDataRange = getFreshByteRange(result)
-//          rawFloat.encode(localDataRange, value.toFloat)
-//          localDataRange.getBytes
-//        case DoubleType =>
-//          val result = new Array[Byte](Bytes.SIZEOF_DOUBLE)
-//          val localDataRange = getFreshByteRange(result)
-//          rawDouble.encode(localDataRange, value.toDouble)
-//          localDataRange.getBytes
-//        case StringType =>
-//          Bytes.toBytes(value)
-//        case TimestampType =>
-//          val result = new Array[Byte](Bytes.SIZEOF_LONG)
-//          val localDataRange = getFreshByteRange(result)
-//          rawLong.encode(localDataRange, value.toLong)
-//          localDataRange.getBytes
-//
-//        case _ => Bytes.toBytes(value)
-//      }
-//    }
-//  }
-//}
-//
-//class ExecutionRuleForUnitTesting(val columnFilterCollection: 
ColumnFilterCollection,
-//                                  val requiredQualifierDefinitionArray:
-//                                  
mutable.MutableList[SchemaQualifierDefinition])

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/spark/FamilyHFileWriteOptions.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/spark/FamilyHFileWriteOptions.scala 
b/loader/src/main/scala/spark/FamilyHFileWriteOptions.scala
deleted file mode 100644
index 2e4023c..0000000
--- a/loader/src/main/scala/spark/FamilyHFileWriteOptions.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.io.Serializable
-
-/**
- * This object will hold optional data for how a given column family's
- * writer will work
- *
- * @param compression       String to define the Compression to be used in the 
HFile
- * @param bloomType         String to define the bloom type to be used in the 
HFile
- * @param blockSize         The block size to be used in the HFile
- * @param dataBlockEncoding String to define the data block encoding to be used
- *                          in the HFile
- */
-class FamilyHFileWriteOptions( val compression:String,
-                               val bloomType: String,
-                               val blockSize: Int,
-                               val dataBlockEncoding: String) extends 
Serializable

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/spark/HBaseContext.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/spark/HBaseContext.scala 
b/loader/src/main/scala/spark/HBaseContext.scala
deleted file mode 100644
index 4ea58f1..0000000
--- a/loader/src/main/scala/spark/HBaseContext.scala
+++ /dev/null
@@ -1,849 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.net.InetSocketAddress
-import java.util
-
-import org.apache.hadoop.hbase.fs.HFileSystem
-import org.apache.hadoop.hbase._
-import org.apache.hadoop.hbase.io.compress.Compression
-import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
-import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFileContextBuilder}
-import org.apache.hadoop.hbase.regionserver.{HStore, StoreFile, BloomType}
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.RDD
-import org.apache.hadoop.conf.Configuration
-import spark.HBaseRDDFunctions._
-import org.apache.hadoop.hbase.client._
-import scala.reflect.ClassTag
-import org.apache.spark.{Logging, SerializableWritable, SparkContext}
-import org.apache.hadoop.hbase.mapreduce.{TableMapReduceUtil,
-TableInputFormat, IdentityTableMapper}
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.streaming.dstream.DStream
-import java.io._
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
-import org.apache.hadoop.fs.{Path, FileSystem}
-import scala.collection.mutable
-
-/**
-  * HBaseContext is a façade for HBase operations
-  * like bulk put, get, increment, delete, and scan
-  *
-  * HBaseContext will take the responsibilities
-  * of disseminating the configuration information
-  * to the working and managing the life cycle of HConnections.
- */
-class HBaseContext(@transient sc: SparkContext,
-                   @transient config: Configuration,
-                   val tmpHdfsConfgFile: String = null)
-  extends Serializable with Logging {
-
-  @transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
-  @transient var tmpHdfsConfiguration:Configuration = config
-  @transient var appliedCredentials = false
-  @transient val job = Job.getInstance(config)
-  TableMapReduceUtil.initCredentials(job)
-  val broadcastedConf = sc.broadcast(new SerializableWritable(config))
-  val credentialsConf = sc.broadcast(new 
SerializableWritable(job.getCredentials))
-
-  LatestHBaseContextCache.latest = this
-
-  if (tmpHdfsConfgFile != null && config != null) {
-    val fs = FileSystem.newInstance(config)
-    val tmpPath = new Path(tmpHdfsConfgFile)
-    if (!fs.exists(tmpPath)) {
-      val outputStream = fs.create(tmpPath)
-      config.write(outputStream)
-      outputStream.close()
-    } else {
-      logWarning("tmpHdfsConfigDir " + tmpHdfsConfgFile + " exist!!")
-    }
-  }
-
-  /**
-   * A simple enrichment of the traditional Spark RDD foreachPartition.
-   * This function differs from the original in that it offers the
-   * developer access to a already connected HConnection object
-   *
-   * Note: Do not close the HConnection object.  All HConnection
-   * management is handled outside this method
-   *
-   * @param rdd  Original RDD with data to iterate over
-   * @param f    Function to be given a iterator to iterate through
-   *             the RDD values and a HConnection object to interact
-   *             with HBase
-   */
-  def foreachPartition[T](rdd: RDD[T],
-                          f: (Iterator[T], Connection) => Unit):Unit = {
-    rdd.foreachPartition(
-      it => hbaseForeachPartition(broadcastedConf, it, f))
-  }
-
-  /**
-   * A simple enrichment of the traditional Spark Streaming dStream foreach
-   * This function differs from the original in that it offers the
-   * developer access to a already connected HConnection object
-   *
-   * Note: Do not close the HConnection object.  All HConnection
-   * management is handled outside this method
-   *
-   * @param dstream  Original DStream with data to iterate over
-   * @param f        Function to be given a iterator to iterate through
-   *                 the DStream values and a HConnection object to
-   *                 interact with HBase
-   */
-  def foreachPartition[T](dstream: DStream[T],
-                    f: (Iterator[T], Connection) => Unit):Unit = {
-    dstream.foreachRDD((rdd, time) => {
-      foreachPartition(rdd, f)
-    })
-  }
-
-  /**
-   * A simple enrichment of the traditional Spark RDD mapPartition.
-   * This function differs from the original in that it offers the
-   * developer access to a already connected HConnection object
-   *
-   * Note: Do not close the HConnection object.  All HConnection
-   * management is handled outside this method
-   *
-   * @param rdd  Original RDD with data to iterate over
-   * @param mp   Function to be given a iterator to iterate through
-   *             the RDD values and a HConnection object to interact
-   *             with HBase
-   * @return     Returns a new RDD generated by the user definition
-   *             function just like normal mapPartition
-   */
-  def mapPartitions[T, R: ClassTag](rdd: RDD[T],
-                                   mp: (Iterator[T], Connection) => 
Iterator[R]): RDD[R] = {
-
-    rdd.mapPartitions[R](it => hbaseMapPartition[T, R](broadcastedConf,
-      it,
-      mp))
-
-  }
-
-  /**
-   * A simple enrichment of the traditional Spark Streaming DStream
-   * foreachPartition.
-   *
-   * This function differs from the original in that it offers the
-   * developer access to a already connected HConnection object
-   *
-   * Note: Do not close the HConnection object.  All HConnection
-   * management is handled outside this method
-   *
-   * Note: Make sure to partition correctly to avoid memory issue when
-   *       getting data from HBase
-   *
-   * @param dstream  Original DStream with data to iterate over
-   * @param f       Function to be given a iterator to iterate through
-   *                 the DStream values and a HConnection object to
-   *                 interact with HBase
-   * @return         Returns a new DStream generated by the user
-   *                 definition function just like normal mapPartition
-   */
-  def streamForeachPartition[T](dstream: DStream[T],
-                                f: (Iterator[T], Connection) => Unit): Unit = {
-
-    dstream.foreachRDD(rdd => this.foreachPartition(rdd, f))
-  }
-
-  /**
-   * A simple enrichment of the traditional Spark Streaming DStream
-   * mapPartition.
-   *
-   * This function differs from the original in that it offers the
-   * developer access to a already connected HConnection object
-   *
-   * Note: Do not close the HConnection object.  All HConnection
-   * management is handled outside this method
-   *
-   * Note: Make sure to partition correctly to avoid memory issue when
-   *       getting data from HBase
-   *
-   * @param dstream  Original DStream with data to iterate over
-   * @param f       Function to be given a iterator to iterate through
-   *                 the DStream values and a HConnection object to
-   *                 interact with HBase
-   * @return         Returns a new DStream generated by the user
-   *                 definition function just like normal mapPartition
-   */
-  def streamMapPartitions[T, U: ClassTag](dstream: DStream[T],
-                                f: (Iterator[T], Connection) => Iterator[U]):
-  DStream[U] = {
-    dstream.mapPartitions(it => hbaseMapPartition[T, U](
-      broadcastedConf,
-      it,
-      f))
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.foreachPartition method.
-   *
-   * It allow addition support for a user to take RDD
-   * and generate puts and send them to HBase.
-   * The complexity of managing the HConnection is
-   * removed from the developer
-   *
-   * @param rdd       Original RDD with data to iterate over
-   * @param tableName The name of the table to put into
-   * @param f         Function to convert a value in the RDD to a HBase Put
-   */
-  def bulkPut[T](rdd: RDD[T], tableName: TableName, f: (T) => Put) {
-
-    val tName = tableName.getName
-    rdd.foreachPartition(
-      it => hbaseForeachPartition[T](
-        broadcastedConf,
-        it,
-        (iterator, connection) => {
-          val m = connection.getBufferedMutator(TableName.valueOf(tName))
-          iterator.foreach(T => m.mutate(f(T)))
-          m.flush()
-          m.close()
-        }))
-  }
-
-  def applyCreds[T] (configBroadcast: 
Broadcast[SerializableWritable[Configuration]]){
-    credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
-
-    logDebug("appliedCredentials:" + appliedCredentials + ",credentials:" + 
credentials)
-
-    if (!appliedCredentials && credentials != null) {
-      appliedCredentials = true
-
-      @transient val ugi = UserGroupInformation.getCurrentUser
-      ugi.addCredentials(credentials)
-      // specify that this is a proxy user
-      ugi.setAuthenticationMethod(AuthenticationMethod.PROXY)
-
-      ugi.addCredentials(credentialsConf.value.value)
-    }
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.streamMapPartition method.
-   *
-   * It allow addition support for a user to take a DStream and
-   * generate puts and send them to HBase.
-   *
-   * The complexity of managing the HConnection is
-   * removed from the developer
-   *
-   * @param dstream    Original DStream with data to iterate over
-   * @param tableName  The name of the table to put into
-   * @param f          Function to convert a value in
-   *                   the DStream to a HBase Put
-   */
-  def streamBulkPut[T](dstream: DStream[T],
-                       tableName: TableName,
-                       f: (T) => Put) = {
-    val tName = tableName.getName
-    dstream.foreachRDD((rdd, time) => {
-      bulkPut(rdd, TableName.valueOf(tName), f)
-    })
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.foreachPartition method.
-   *
-   * It allow addition support for a user to take a RDD and generate delete
-   * and send them to HBase.  The complexity of managing the HConnection is
-   * removed from the developer
-   *
-   * @param rdd       Original RDD with data to iterate over
-   * @param tableName The name of the table to delete from
-   * @param f         Function to convert a value in the RDD to a
-   *                  HBase Deletes
-   * @param batchSize       The number of delete to batch before sending to 
HBase
-   */
-  def bulkDelete[T](rdd: RDD[T], tableName: TableName,
-                    f: (T) => Delete, batchSize: Integer) {
-    bulkMutation(rdd, tableName, f, batchSize)
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.streamBulkMutation method.
-   *
-   * It allow addition support for a user to take a DStream and
-   * generate Delete and send them to HBase.
-   *
-   * The complexity of managing the HConnection is
-   * removed from the developer
-   *
-   * @param dstream    Original DStream with data to iterate over
-   * @param tableName  The name of the table to delete from
-   * @param f          function to convert a value in the DStream to a
-   *                   HBase Delete
-   * @param batchSize        The number of deletes to batch before sending to 
HBase
-   */
-  def streamBulkDelete[T](dstream: DStream[T],
-                          tableName: TableName,
-                          f: (T) => Delete,
-                          batchSize: Integer) = {
-    streamBulkMutation(dstream, tableName, f, batchSize)
-  }
-
-  /**
-   *  Under lining function to support all bulk mutations
-   *
-   *  May be opened up if requested
-   */
-  private def bulkMutation[T](rdd: RDD[T], tableName: TableName,
-                              f: (T) => Mutation, batchSize: Integer) {
-
-    val tName = tableName.getName
-    rdd.foreachPartition(
-      it => hbaseForeachPartition[T](
-        broadcastedConf,
-        it,
-        (iterator, connection) => {
-          val table = connection.getTable(TableName.valueOf(tName))
-          val mutationList = new java.util.ArrayList[Mutation]
-          iterator.foreach(T => {
-            mutationList.add(f(T))
-            if (mutationList.size >= batchSize) {
-              table.batch(mutationList, null)
-              mutationList.clear()
-            }
-          })
-          if (mutationList.size() > 0) {
-            table.batch(mutationList, null)
-            mutationList.clear()
-          }
-          table.close()
-        }))
-  }
-
-  /**
-   *  Under lining function to support all bulk streaming mutations
-   *
-   *  May be opened up if requested
-   */
-  private def streamBulkMutation[T](dstream: DStream[T],
-                                    tableName: TableName,
-                                    f: (T) => Mutation,
-                                    batchSize: Integer) = {
-    val tName = tableName.getName
-    dstream.foreachRDD((rdd, time) => {
-      bulkMutation(rdd, TableName.valueOf(tName), f, batchSize)
-    })
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.mapPartition method.
-   *
-   * It allow addition support for a user to take a RDD and generates a
-   * new RDD based on Gets and the results they bring back from HBase
-   *
-   * @param rdd     Original RDD with data to iterate over
-   * @param tableName        The name of the table to get from
-   * @param makeGet    function to convert a value in the RDD to a
-   *                   HBase Get
-   * @param convertResult This will convert the HBase Result object to
-   *                   what ever the user wants to put in the resulting
-   *                   RDD
-   * return            new RDD that is created by the Get to HBase
-   */
-  def bulkGet[T, U: ClassTag](tableName: TableName,
-                    batchSize: Integer,
-                    rdd: RDD[T],
-                    makeGet: (T) => Get,
-                    convertResult: (Result) => U): RDD[U] = {
-
-    val getMapPartition = new GetMapPartition(tableName,
-      batchSize,
-      makeGet,
-      convertResult)
-
-    rdd.mapPartitions[U](it =>
-      hbaseMapPartition[T, U](
-        broadcastedConf,
-        it,
-        getMapPartition.run))
-  }
-
-  /**
-   * A simple abstraction over the HBaseContext.streamMap method.
-   *
-   * It allow addition support for a user to take a DStream and
-   * generates a new DStream based on Gets and the results
-   * they bring back from HBase
-   *
-   * @param tableName     The name of the table to get from
-   * @param batchSize     The number of Gets to be sent in a single batch
-   * @param dStream       Original DStream with data to iterate over
-   * @param makeGet       Function to convert a value in the DStream to a
-   *                      HBase Get
-   * @param convertResult This will convert the HBase Result object to
-   *                      what ever the user wants to put in the resulting
-   *                      DStream
-   * @return              A new DStream that is created by the Get to HBase
-   */
-  def streamBulkGet[T, U: ClassTag](tableName: TableName,
-                                    batchSize: Integer,
-                                    dStream: DStream[T],
-                                    makeGet: (T) => Get,
-                                    convertResult: (Result) => U): DStream[U] 
= {
-
-    val getMapPartition = new GetMapPartition(tableName,
-      batchSize,
-      makeGet,
-      convertResult)
-
-    dStream.mapPartitions[U](it => hbaseMapPartition[T, U](
-      broadcastedConf,
-      it,
-      getMapPartition.run))
-  }
-
-  /**
-   * This function will use the native HBase TableInputFormat with the
-   * given scan object to generate a new RDD
-   *
-   *  @param tableName the name of the table to scan
-   *  @param scan      the HBase scan object to use to read data from HBase
-   *  @param f         function to convert a Result object from HBase into
-   *                   what the user wants in the final generated RDD
-   *  @return          new RDD with results from scan
-   */
-  def hbaseRDD[U: ClassTag](tableName: TableName, scan: Scan,
-                            f: ((ImmutableBytesWritable, Result)) => U): 
RDD[U] = {
-
-    val job: Job = Job.getInstance(getConf(broadcastedConf))
-
-    TableMapReduceUtil.initCredentials(job)
-    TableMapReduceUtil.initTableMapperJob(tableName, scan,
-      classOf[IdentityTableMapper], null, null, job)
-
-    sc.newAPIHadoopRDD(job.getConfiguration,
-      classOf[TableInputFormat],
-      classOf[ImmutableBytesWritable],
-      classOf[Result]).map(f)
-  }
-
-  /**
-   * A overloaded version of HBaseContext hbaseRDD that defines the
-   * type of the resulting RDD
-   *
-   *  @param tableName the name of the table to scan
-   *  @param scans     the HBase scan object to use to read data from HBase
-   *  @return          New RDD with results from scan
-   *
-   */
-  def hbaseRDD(tableName: TableName, scans: Scan):
-  RDD[(ImmutableBytesWritable, Result)] = {
-
-    hbaseRDD[(ImmutableBytesWritable, Result)](
-      tableName,
-      scans,
-      (r: (ImmutableBytesWritable, Result)) => r)
-  }
-
-  /**
-   *  underlining wrapper all foreach functions in HBaseContext
-   */
-  private def hbaseForeachPartition[T](configBroadcast:
-                                       
Broadcast[SerializableWritable[Configuration]],
-                                        it: Iterator[T],
-                                        f: (Iterator[T], Connection) => Unit) 
= {
-
-    val config = getConf(configBroadcast)
-
-    applyCreds(configBroadcast)
-    // specify that this is a proxy user
-    val connection = ConnectionFactory.createConnection(config)
-    f(it, connection)
-    connection.close()
-  }
-
-  private def getConf(configBroadcast: 
Broadcast[SerializableWritable[Configuration]]):
-  Configuration = {
-
-    if (tmpHdfsConfiguration == null && tmpHdfsConfgFile != null) {
-      val fs = FileSystem.newInstance(SparkHadoopUtil.get.conf)
-      val inputStream = fs.open(new Path(tmpHdfsConfgFile))
-      tmpHdfsConfiguration = new Configuration(false)
-      tmpHdfsConfiguration.readFields(inputStream)
-      inputStream.close()
-    }
-
-    if (tmpHdfsConfiguration == null) {
-      try {
-        tmpHdfsConfiguration = configBroadcast.value.value
-      } catch {
-        case ex: Exception => logError("Unable to getConfig from broadcast", 
ex)
-      }
-    }
-    tmpHdfsConfiguration
-  }
-
-  /**
-   *  underlining wrapper all mapPartition functions in HBaseContext
-   *
-   */
-  private def hbaseMapPartition[K, U](
-                                       configBroadcast:
-                                       
Broadcast[SerializableWritable[Configuration]],
-                                       it: Iterator[K],
-                                       mp: (Iterator[K], Connection) =>
-                                         Iterator[U]): Iterator[U] = {
-
-    val config = getConf(configBroadcast)
-    applyCreds(configBroadcast)
-
-    val connection = ConnectionFactory.createConnection(config)
-    val res = mp(it, connection)
-    connection.close()
-    res
-
-  }
-
-  /**
-   *  underlining wrapper all get mapPartition functions in HBaseContext
-   */
-  private class GetMapPartition[T, U](tableName: TableName,
-                                      batchSize: Integer,
-                                      makeGet: (T) => Get,
-                                      convertResult: (Result) => U)
-    extends Serializable {
-
-    val tName = tableName.getName
-
-    def run(iterator: Iterator[T], connection: Connection): Iterator[U] = {
-      val table = connection.getTable(TableName.valueOf(tName))
-
-      val gets = new java.util.ArrayList[Get]()
-      var res = List[U]()
-
-      while (iterator.hasNext) {
-        gets.add(makeGet(iterator.next()))
-
-        if (gets.size() == batchSize) {
-          val results = table.get(gets)
-          res = res ++ results.map(convertResult)
-          gets.clear()
-        }
-      }
-      if (gets.size() > 0) {
-        val results = table.get(gets)
-        res = res ++ results.map(convertResult)
-        gets.clear()
-      }
-      table.close()
-      res.iterator
-    }
-  }
-
-  /**
-   * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
-   *
-   * This method is used to keep ClassTags out of the external Java API, as
-   * the Java compiler cannot produce them automatically. While this
-   * ClassTag-faking does please the compiler, it can cause problems at runtime
-   * if the Scala API relies on ClassTags for correctness.
-   *
-   * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
-   * just worse performance or security issues.
-   * For instance, an Array of AnyRef can hold any type T, but may lose 
primitive
-   * specialization.
-   */
-  private[spark]
-  def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
-
-  /**
-   * A Spark Implementation of HBase Bulk load
-   *
-   * This will take the content from an existing RDD then sort and shuffle
-   * it with respect to region splits.  The result of that sort and shuffle
-   * will be written to HFiles.
-   *
-   * After this function is executed the user will have to call
-   * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
-   *
-   * Also note this version of bulk load is different from past versions in
-   * that it includes the qualifier as part of the sort process. The
-   * reason for this is to be able to support rows will very large number
-   * of columns.
-   *
-   * @param rdd                            The RDD we are bulk loading from
-   * @param tableName                      The HBase table we are loading into
-   * @param flatMap                        A flapMap function that will make 
every
-   *                                       row in the RDD
-   *                                       into N cells for the bulk load
-   * @param stagingDir                     The location on the FileSystem to 
bulk load into
-   * @param familyHFileWriteOptionsMap     Options that will define how the 
HFile for a
-   *                                       column family is written
-   * @param compactionExclude              Compaction excluded for the HFiles
-   * @param maxSize                        Max size for the HFiles before they 
roll
-   * @tparam T                             The Type of values in the original 
RDD
-   */
-  def bulkLoad[T](rdd:RDD[T],
-                  tableName: TableName,
-                  flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
-                  stagingDir:String,
-                  familyHFileWriteOptionsMap:
-                  util.Map[Array[Byte], FamilyHFileWriteOptions] =
-                  new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
-                  compactionExclude: Boolean = false,
-                  maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
-  Unit = {
-    val conn = ConnectionFactory.createConnection(config)
-    val regionLocator = conn.getRegionLocator(tableName)
-    val startKeys = regionLocator.getStartKeys
-    val defaultCompressionStr = config.get("hfile.compression",
-      Compression.Algorithm.NONE.getName)
-    val defaultCompression = 
Compression.getCompressionAlgorithmByName(defaultCompressionStr)
-//      HFileWriterImpl
-//      .compressionByName(defaultCompressionStr)
-    val now = System.currentTimeMillis()
-    val tableNameByteArray = tableName.getName
-
-    val familyHFileWriteOptionsMapInternal =
-      new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
-
-    val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
-
-    while (entrySetIt.hasNext) {
-      val entry = entrySetIt.next()
-      familyHFileWriteOptionsMapInternal.put(new 
ByteArrayWrapper(entry.getKey), entry.getValue)
-    }
-
-    /**
-     *  This will return a new HFile writer when requested
-     *
-     * @param family       column family
-     * @param conf         configuration to connect to HBase
-     * @param favoredNodes nodes that we would like to write too
-     * @param fs           FileSystem object where we will be writing the 
HFiles to
-     * @return WriterLength object
-     */
-    def getNewWriter(family: Array[Byte], conf: Configuration,
-                     favoredNodes: Array[InetSocketAddress],
-                     fs:FileSystem,
-                     familydir:Path): WriterLength = {
-
-
-      var familyOptions = familyHFileWriteOptionsMapInternal.get(new 
ByteArrayWrapper(family))
-
-      if (familyOptions == null) {
-        familyOptions = new 
FamilyHFileWriteOptions(defaultCompression.toString,
-          BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, 
DataBlockEncoding.NONE.toString)
-        familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), 
familyOptions)
-      }
-
-      val tempConf = new Configuration(conf)
-      tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
-      val contextBuilder = new HFileContextBuilder()
-        .withCompression(Algorithm.valueOf(familyOptions.compression))
-        .withChecksumType(HStore.getChecksumType(conf))
-        .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
-        .withBlockSize(familyOptions.blockSize)
-      contextBuilder.withDataBlockEncoding(DataBlockEncoding.
-        valueOf(familyOptions.dataBlockEncoding))
-      val hFileContext = contextBuilder.build()
-
-      if (null == favoredNodes) {
-        new WriterLength(0, new StoreFile.WriterBuilder(conf, new 
CacheConfig(tempConf), fs)
-          
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
-          
.withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext).build())
-//          
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build())
-      } else {
-        new WriterLength(0,
-          new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new 
HFileSystem(fs))
-          
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
-            .withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext)
-//          
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
-          .withFavoredNodes(favoredNodes).build())
-      }
-    }
-
-    val regionSplitPartitioner =
-      new BulkLoadPartitioner(startKeys)
-
-    //This is where all the magic happens
-    //Here we are going to do the following things
-    // 1. FlapMap every row in the RDD into key column value tuples
-    // 2. Then we are going to repartition sort and shuffle
-    // 3. Finally we are going to write out our HFiles
-    rdd.flatMap( r => flatMap(r)).
-      repartitionAndSortWithinPartitions(regionSplitPartitioner).
-      hbaseForeachPartition(this, (it, conn) => {
-
-      val conf = broadcastedConf.value.value
-      val fs = FileSystem.get(conf)
-      val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
-      var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
-      var rollOverRequested = false
-
-      /**
-       * This will roll all writers
-       */
-      def rollWriters(): Unit = {
-        writerMap.values.foreach( wl => {
-          if (wl.writer != null) {
-            logDebug("Writer=" + wl.writer.getPath +
-              (if (wl.written == 0) "" else ", wrote=" + wl.written))
-            close(wl.writer)
-          }
-        })
-        writerMap.clear()
-        rollOverRequested = false
-      }
-
-      /**
-       * This function will close a given HFile writer
-       * @param w The writer to close
-       */
-      def close(w:StoreFile.Writer): Unit = {
-        if (w != null) {
-          w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
-            Bytes.toBytes(System.currentTimeMillis()))
-          w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
-            Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow)))
-          w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
-            Bytes.toBytes(true))
-          w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
-            Bytes.toBytes(compactionExclude))
-          w.appendTrackedTimestampsToMetadata()
-          w.close()
-        }
-      }
-
-      //Here is where we finally iterate through the data in this partition of 
the
-      //RDD that has been sorted and partitioned
-      it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) =>
-
-        //This will get a writer for the column family
-        //If there is no writer for a given column family then
-        //it will get created here.
-        val wl = writerMap.getOrElseUpdate(new 
ByteArrayWrapper(keyFamilyQualifier.family), {
-
-          val familyDir = new Path(stagingDir, 
Bytes.toString(keyFamilyQualifier.family))
-
-          fs.mkdirs(familyDir)
-
-          val loc:HRegionLocation = {
-            try {
-              val locator =
-                conn.getRegionLocator(TableName.valueOf(tableNameByteArray))
-              locator.getRegionLocation(keyFamilyQualifier.rowKey)
-            } catch {
-              case e: Throwable =>
-              logWarning("there's something wrong when locating rowkey: " +
-                Bytes.toString(keyFamilyQualifier.rowKey))
-                null
-            }
-          }
-          if (null == loc) {
-            if (log.isTraceEnabled) {
-              logTrace("failed to get region location, so use default writer: 
" +
-                Bytes.toString(keyFamilyQualifier.rowKey))
-            }
-            getNewWriter(family = keyFamilyQualifier.family, conf = conf, 
favoredNodes = null,
-              fs = fs, familydir = familyDir)
-          } else {
-            if (log.isDebugEnabled) {
-              logDebug("first rowkey: [" + 
Bytes.toString(keyFamilyQualifier.rowKey) + "]")
-            }
-            val initialIsa =
-              new InetSocketAddress(loc.getHostname, loc.getPort)
-            if (initialIsa.isUnresolved) {
-              if (log.isTraceEnabled) {
-                logTrace("failed to resolve bind address: " + loc.getHostname 
+ ":"
-                  + loc.getPort + ", so use default writer")
-              }
-              getNewWriter(keyFamilyQualifier.family, conf, null, fs, 
familyDir)
-            } else {
-              if(log.isDebugEnabled) {
-                logDebug("use favored nodes writer: " + 
initialIsa.getHostString)
-              }
-              getNewWriter(keyFamilyQualifier.family, conf,
-                Array[InetSocketAddress](initialIsa), fs, familyDir)
-            }
-          }
-        })
-
-        val keyValue =new KeyValue(keyFamilyQualifier.rowKey,
-          keyFamilyQualifier.family,
-          keyFamilyQualifier.qualifier,
-          now,cellValue)
-
-        wl.writer.append(keyValue)
-        wl.written += keyValue.getLength
-
-        rollOverRequested = rollOverRequested || wl.written > maxSize
-
-        //This will only roll if we have at least one column family file that 
is
-        //bigger then maxSize and we have finished a given row key
-        if (rollOverRequested && Bytes.compareTo(previousRow, 
keyFamilyQualifier.rowKey) != 0) {
-          rollWriters()
-        }
-
-        previousRow = keyFamilyQualifier.rowKey
-      }
-      //We have finished all the data so lets close up the writers
-      rollWriters()
-    })
-  }
-
-  /**
-   * This is a wrapper class around StoreFile.Writer.  The reason for the
-   * wrapper is to keep the length of the file along side the writer
-   *
-   * @param written The writer to be wrapped
-   * @param writer  The number of bytes written to the writer
-   */
-  class WriterLength(var written:Long, val writer:StoreFile.Writer)
-
-  /**
-   * This is a wrapper over a byte array so it can work as
-   * a key in a hashMap
-   *
-   * @param o1 The Byte Array value
-   */
-  class ByteArrayWrapper (val o1:Array[Byte])
-    extends Comparable[ByteArrayWrapper] with Serializable {
-    override def compareTo(o2: ByteArrayWrapper): Int = {
-      Bytes.compareTo(o1,o2.o1)
-    }
-    override def equals(o2: Any): Boolean = {
-      o2 match {
-        case wrapper: ByteArrayWrapper =>
-          Bytes.equals(o1, wrapper.o1)
-        case _ =>
-          false
-      }
-    }
-    override def hashCode():Int = {
-      Bytes.hashCode(o1)
-    }
-  }
-}
-
-object LatestHBaseContextCache {
-  var latest:HBaseContext = null
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/spark/HBaseDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/spark/HBaseDStreamFunctions.scala 
b/loader/src/main/scala/spark/HBaseDStreamFunctions.scala
deleted file mode 100644
index fc45865..0000000
--- a/loader/src/main/scala/spark/HBaseDStreamFunctions.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package spark
-
-import org.apache.hadoop.hbase.TableName
-import org.apache.hadoop.hbase.client._
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.spark.streaming.dstream.DStream
-
-import scala.reflect.ClassTag
-
-/**
- * HBaseDStreamFunctions contains a set of implicit functions that can be
- * applied to a Spark DStream so that we can easily interact with HBase
- */
-object HBaseDStreamFunctions {
-
-  /**
-   * These are implicit methods for a DStream that contains any type of
-   * data.
-   *
-   * @param dStream  This is for dStreams of any type
-   * @tparam T       Type T
-   */
-  implicit class GenericHBaseDStreamFunctions[T](val dStream: DStream[T]) {
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's bulk
-     * put.  This will not return a new Stream.  Think of it like a foreach
-     *
-     * @param hc         The hbaseContext object to identify which
-     *                   HBase cluster connection to use
-     * @param tableName  The tableName that the put will be sent to
-     * @param f          The function that will turn the DStream values
-     *                   into HBase Put objects.
-     */
-    def hbaseBulkPut(hc: HBaseContext,
-                     tableName: TableName,
-                     f: (T) => Put): Unit = {
-      hc.streamBulkPut(dStream, tableName, f)
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's bulk
-     * get.  This will return a new DStream.  Think about it as a DStream map
-     * function.  In that every DStream value will get a new value out of
-     * HBase.  That new value will populate the newly generated DStream.
-     *
-     * @param hc             The hbaseContext object to identify which
-     *                       HBase cluster connection to use
-     * @param tableName      The tableName that the put will be sent to
-     * @param batchSize      How many gets to execute in a single batch
-     * @param f              The function that will turn the RDD values
-     *                       in HBase Get objects
-     * @param convertResult  The function that will convert a HBase
-     *                       Result object into a value that will go
-     *                       into the resulting DStream
-     * @tparam R             The type of Object that will be coming
-     *                       out of the resulting DStream
-     * @return               A resulting DStream with type R objects
-     */
-    def hbaseBulkGet[R: ClassTag](hc: HBaseContext,
-                     tableName: TableName,
-                     batchSize:Int, f: (T) => Get, convertResult: (Result) => 
R):
-    DStream[R] = {
-      hc.streamBulkGet[T, R](tableName, batchSize, dStream, f, convertResult)
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's bulk
-     * get.  This will return a new DStream.  Think about it as a DStream map
-     * function.  In that every DStream value will get a new value out of
-     * HBase.  That new value will populate the newly generated DStream.
-     *
-     * @param hc             The hbaseContext object to identify which
-     *                       HBase cluster connection to use
-     * @param tableName      The tableName that the put will be sent to
-     * @param batchSize      How many gets to execute in a single batch
-     * @param f              The function that will turn the RDD values
-     *                       in HBase Get objects
-     * @return               A resulting DStream with type R objects
-     */
-    def hbaseBulkGet(hc: HBaseContext,
-                     tableName: TableName, batchSize:Int,
-                     f: (T) => Get): DStream[(ImmutableBytesWritable, Result)] 
= {
-        hc.streamBulkGet[T, (ImmutableBytesWritable, Result)](
-          tableName, batchSize, dStream, f,
-          result => (new ImmutableBytesWritable(result.getRow), result))
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's bulk
-     * Delete.  This will not return a new DStream.
-     *
-     * @param hc         The hbaseContext object to identify which HBase
-     *                   cluster connection to use
-     * @param tableName  The tableName that the deletes will be sent to
-     * @param f          The function that will convert the DStream value into
-     *                   a HBase Delete Object
-     * @param batchSize  The number of Deletes to be sent in a single batch
-     */
-    def hbaseBulkDelete(hc: HBaseContext,
-                        tableName: TableName,
-                        f:(T) => Delete, batchSize:Int): Unit = {
-      hc.streamBulkDelete(dStream, tableName, f, batchSize)
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's
-     * foreachPartition method.  This will ack very much like a normal DStream
-     * foreach method but for the fact that you will now have a HBase 
connection
-     * while iterating through the values.
-     *
-     * @param hc  The hbaseContext object to identify which HBase
-     *            cluster connection to use
-     * @param f   This function will get an iterator for a Partition of an
-     *            DStream along with a connection object to HBase
-     */
-    def hbaseForeachPartition(hc: HBaseContext,
-                              f: (Iterator[T], Connection) => Unit): Unit = {
-      hc.streamForeachPartition(dStream, f)
-    }
-
-    /**
-     * Implicit method that gives easy access to HBaseContext's
-     * mapPartitions method.  This will ask very much like a normal DStream
-     * map partitions method but for the fact that you will now have a
-     * HBase connection while iterating through the values
-     *
-     * @param hc  The hbaseContext object to identify which HBase
-     *            cluster connection to use
-     * @param f   This function will get an iterator for a Partition of an
-     *            DStream along with a connection object to HBase
-     * @tparam R  This is the type of objects that will go into the resulting
-     *            DStream
-     * @return    A resulting DStream of type R
-     */
-    def hbaseMapPartitions[R: ClassTag](hc: HBaseContext,
-                                        f: (Iterator[T], Connection) => 
Iterator[R]):
-    DStream[R] = {
-      hc.streamMapPartitions(dStream, f)
-    }
-  }
-}

Reply via email to