http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/main/scala/spark/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/spark/DefaultSource.scala b/loader/src/main/scala/spark/DefaultSource.scala deleted file mode 100644 index cc36a15..0000000 --- a/loader/src/main/scala/spark/DefaultSource.scala +++ /dev/null @@ -1,982 +0,0 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one or more -// * contributor license agreements. See the NOTICE file distributed with -// * this work for additional information regarding copyright ownership. -// * The ASF licenses this file to You under the Apache License, Version 2.0 -// * (the "License"); you may not use this file except in compliance with -// * the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -//package spark -// -//import java.util -//import java.util.concurrent.ConcurrentLinkedQueue -// -//import org.apache.hadoop.hbase.client.{ConnectionFactory, Get, Result, Scan} -//import org.apache.hadoop.hbase.types._ -//import org.apache.hadoop.hbase.util.{SimplePositionedMutableByteRange, PositionedByteRange, Bytes} -//import org.apache.hadoop.hbase.{TableName, HBaseConfiguration} -//import org.apache.spark.Logging -//import org.apache.spark.rdd.RDD -//import org.apache.spark.sql.types.DataType -//import org.apache.spark.sql.{Row, SQLContext} -//import org.apache.spark.sql.sources._ -//import org.apache.spark.sql.types._ -// -//import scala.collection.mutable -// -///** -// * DefaultSource for integration with Spark's dataframe datasources. -// * This class will produce a relationProvider based on input given to it from spark -// * -// * In all this DefaultSource support the following datasource functionality -// * - Scan range pruning through filter push down logic based on rowKeys -// * - Filter push down logic on HBase Cells -// * - Qualifier filtering based on columns used in the SparkSQL statement -// * - Type conversions of basic SQL types. All conversions will be -// * Through the HBase Bytes object commands. -// */ -//class DefaultSource extends RelationProvider { -// -// val TABLE_KEY:String = "hbase.table" -// val SCHEMA_COLUMNS_MAPPING_KEY:String = "hbase.columns.mapping" -// val BATCHING_NUM_KEY:String = "hbase.batching.num" -// val CACHING_NUM_KEY:String = "hbase.caching.num" -// val HBASE_CONFIG_RESOURCES_LOCATIONS:String = "hbase.config.resources" -// val USE_HBASE_CONTEXT:String = "hbase.use.hbase.context" -// -// /** -// * Is given input from SparkSQL to construct a BaseRelation -// * @param sqlContext SparkSQL context -// * @param parameters Parameters given to us from SparkSQL -// * @return A BaseRelation Object -// */ -// override def createRelation(sqlContext: SQLContext, -// parameters: Map[String, String]): -// BaseRelation = { -// -// -// val tableName = parameters.get(TABLE_KEY) -// if (tableName.isEmpty) -// new IllegalArgumentException("Invalid value for " + TABLE_KEY +" '" + tableName + "'") -// -// val schemaMappingString = parameters.getOrElse(SCHEMA_COLUMNS_MAPPING_KEY, "") -// val batchingNumStr = parameters.getOrElse(BATCHING_NUM_KEY, "1000") -// val cachingNumStr = parameters.getOrElse(CACHING_NUM_KEY, "1000") -// val hbaseConfigResources = parameters.getOrElse(HBASE_CONFIG_RESOURCES_LOCATIONS, "") -// val useHBaseReources = parameters.getOrElse(USE_HBASE_CONTEXT, "true") -// -// val batchingNum:Int = try { -// batchingNumStr.toInt -// } catch { -// case e:NumberFormatException => throw -// new IllegalArgumentException("Invalid value for " + BATCHING_NUM_KEY +" '" -// + batchingNumStr + "'", e) -// } -// -// val cachingNum:Int = try { -// cachingNumStr.toInt -// } catch { -// case e:NumberFormatException => throw -// new IllegalArgumentException("Invalid value for " + CACHING_NUM_KEY +" '" -// + cachingNumStr + "'", e) -// } -// -// new HBaseRelation(tableName.get, -// generateSchemaMappingMap(schemaMappingString), -// batchingNum.toInt, -// cachingNum.toInt, -// hbaseConfigResources, -// useHBaseReources.equalsIgnoreCase("true"))(sqlContext) -// } -// -// /** -// * Reads the SCHEMA_COLUMNS_MAPPING_KEY and converts it to a map of -// * SchemaQualifierDefinitions with the original sql column name as the key -// * @param schemaMappingString The schema mapping string from the SparkSQL map -// * @return A map of definitions keyed by the SparkSQL column name -// */ -// def generateSchemaMappingMap(schemaMappingString:String): -// java.util.HashMap[String, SchemaQualifierDefinition] = { -// try { -// val columnDefinitions = schemaMappingString.split(',') -// val resultingMap = new java.util.HashMap[String, SchemaQualifierDefinition]() -// columnDefinitions.map(cd => { -// val parts = cd.trim.split(' ') -// -// //Make sure we get three parts -// //<ColumnName> <ColumnType> <ColumnFamily:Qualifier> -// if (parts.length == 3) { -// val hbaseDefinitionParts = if (parts(2).charAt(0) == ':') { -// Array[String]("", "key") -// } else { -// parts(2).split(':') -// } -// resultingMap.put(parts(0), new SchemaQualifierDefinition(parts(0), -// parts(1), hbaseDefinitionParts(0), hbaseDefinitionParts(1))) -// } else { -// throw new IllegalArgumentException("Invalid value for schema mapping '" + cd + -// "' should be '<columnName> <columnType> <columnFamily>:<qualifier>' " + -// "for columns and '<columnName> <columnType> :<qualifier>' for rowKeys") -// } -// }) -// resultingMap -// } catch { -// case e:Exception => throw -// new IllegalArgumentException("Invalid value for " + SCHEMA_COLUMNS_MAPPING_KEY + -// " '" + schemaMappingString + "'", e ) -// } -// } -//} -// -///** -// * Implementation of Spark BaseRelation that will build up our scan logic -// * , do the scan pruning, filter push down, and value conversions -// * -// * @param tableName HBase table that we plan to read from -// * @param schemaMappingDefinition SchemaMapping information to map HBase -// * Qualifiers to SparkSQL columns -// * @param batchingNum The batching number to be applied to the -// * scan object -// * @param cachingNum The caching number to be applied to the -// * scan object -// * @param configResources Optional comma separated list of config resources -// * to get based on their URI -// * @param useHBaseContext If true this will look to see if -// * HBaseContext.latest is populated to use that -// * connection information -// * @param sqlContext SparkSQL context -// */ -//class HBaseRelation (val tableName:String, -// val schemaMappingDefinition: -// java.util.HashMap[String, SchemaQualifierDefinition], -// val batchingNum:Int, -// val cachingNum:Int, -// val configResources:String, -// val useHBaseContext:Boolean) ( -// @transient val sqlContext:SQLContext) -// extends BaseRelation with PrunedFilteredScan with Logging { -// -// //create or get latest HBaseContext -// @transient val hbaseContext:HBaseContext = if (useHBaseContext) { -// LatestHBaseContextCache.latest -// } else { -// val config = HBaseConfiguration.create() -// configResources.split(",").foreach( r => config.addResource(r)) -// new HBaseContext(sqlContext.sparkContext, config) -// } -// -// /** -// * Generates a Spark SQL schema object so Spark SQL knows what is being -// * provided by this BaseRelation -// * -// * @return schema generated from the SCHEMA_COLUMNS_MAPPING_KEY value -// */ -// override def schema: StructType = { -// -// val metadataBuilder = new MetadataBuilder() -// -// val structFieldArray = new Array[StructField](schemaMappingDefinition.size()) -// -// val schemaMappingDefinitionIt = schemaMappingDefinition.values().iterator() -// var indexCounter = 0 -// while (schemaMappingDefinitionIt.hasNext) { -// val c = schemaMappingDefinitionIt.next() -// -// val metadata = metadataBuilder.putString("name", c.columnName).build() -// val structField = -// new StructField(c.columnName, c.columnSparkSqlType, nullable = true, metadata) -// -// structFieldArray(indexCounter) = structField -// indexCounter += 1 -// } -// -// val result = new StructType(structFieldArray) -// result -// } -// -// /** -// * Here we are building the functionality to populate the resulting RDD[Row] -// * Here is where we will do the following: -// * - Filter push down -// * - Scan or GetList pruning -// * - Executing our scan(s) or/and GetList to generate result -// * -// * @param requiredColumns The columns that are being requested by the requesting query -// * @param filters The filters that are being applied by the requesting query -// * @return RDD will all the results from HBase needed for SparkSQL to -// * execute the query on -// */ -// override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { -// -// val columnFilterCollection = buildColumnFilterCollection(filters) -// -// val requiredQualifierDefinitionArray = new mutable.MutableList[SchemaQualifierDefinition] -// requiredColumns.foreach( c => { -// val definition = schemaMappingDefinition.get(c) -// if (definition.columnFamilyBytes.length > 0) { -// requiredQualifierDefinitionArray += definition -// } -// }) -// -// //Create a local variable so that scala doesn't have to -// // serialize the whole HBaseRelation Object -// val serializableDefinitionMap = schemaMappingDefinition -// -// -// //retain the information for unit testing checks -// DefaultSourceStaticUtils.populateLatestExecutionRules(columnFilterCollection, -// requiredQualifierDefinitionArray) -// var resultRDD: RDD[Row] = null -// -// if (columnFilterCollection != null) { -// val pushDownFilterJava = -// new SparkSQLPushDownFilter( -// columnFilterCollection.generateFamilyQualifiterFilterMap(schemaMappingDefinition)) -// -// val getList = new util.ArrayList[Get]() -// val rddList = new util.ArrayList[RDD[Row]]() -// -// val it = columnFilterCollection.columnFilterMap.iterator -// while (it.hasNext) { -// val e = it.next() -// val columnDefinition = schemaMappingDefinition.get(e._1) -// //check is a rowKey -// if (columnDefinition != null && columnDefinition.columnFamily.isEmpty) { -// //add points to getList -// e._2.points.foreach(p => { -// val get = new Get(p) -// requiredQualifierDefinitionArray.foreach( d => -// get.addColumn(d.columnFamilyBytes, d.qualifierBytes)) -// getList.add(get) -// }) -// -// val rangeIt = e._2.ranges.iterator -// -// while (rangeIt.hasNext) { -// val r = rangeIt.next() -// -// val scan = new Scan() -// scan.setBatch(batchingNum) -// scan.setCaching(cachingNum) -// requiredQualifierDefinitionArray.foreach( d => -// scan.addColumn(d.columnFamilyBytes, d.qualifierBytes)) -// -// if (pushDownFilterJava.columnFamilyQualifierFilterMap.size() > 0) { -// scan.setFilter(pushDownFilterJava) -// } -// -// //Check if there is a lower bound -// if (r.lowerBound != null && r.lowerBound.length > 0) { -// -// if (r.isLowerBoundEqualTo) { -// //HBase startRow is inclusive: Therefore it acts like isLowerBoundEqualTo -// // by default -// scan.setStartRow(r.lowerBound) -// } else { -// //Since we don't equalTo we want the next value we need -// // to add another byte to the start key. That new byte will be -// // the min byte value. -// val newArray = new Array[Byte](r.lowerBound.length + 1) -// System.arraycopy(r.lowerBound, 0, newArray, 0, r.lowerBound.length) -// -// //new Min Byte -// newArray(r.lowerBound.length) = Byte.MinValue -// scan.setStartRow(newArray) -// } -// } -// -// //Check if there is a upperBound -// if (r.upperBound != null && r.upperBound.length > 0) { -// if (r.isUpperBoundEqualTo) { -// //HBase stopRow is exclusive: therefore it DOESN'T ast like isUpperBoundEqualTo -// // by default. So we need to add a new max byte to the stopRow key -// val newArray = new Array[Byte](r.upperBound.length + 1) -// System.arraycopy(r.upperBound, 0, newArray, 0, r.upperBound.length) -// -// //New Max Bytes -// newArray(r.upperBound.length) = Byte.MaxValue -// -// scan.setStopRow(newArray) -// } else { -// //Here equalTo is false for Upper bound which is exclusive and -// // HBase stopRow acts like that by default so no need to mutate the -// // rowKey -// scan.setStopRow(r.upperBound) -// } -// } -// -// val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan).map(r => { -// Row.fromSeq(requiredColumns.map(c => -// DefaultSourceStaticUtils.getValue(c, serializableDefinitionMap, r._2))) -// }) -// rddList.add(rdd) -// } -// } -// } -// -// //If there is more then one RDD then we have to union them together -// for (i <- 0 until rddList.size()) { -// if (resultRDD == null) resultRDD = rddList.get(i) -// else resultRDD = resultRDD.union(rddList.get(i)) -// -// } -// -// //If there are gets then we can get them from the driver and union that rdd in -// // with the rest of the values. -// if (getList.size() > 0) { -// val connection = ConnectionFactory.createConnection(hbaseContext.tmpHdfsConfiguration) -// try { -// val table = connection.getTable(TableName.valueOf(tableName)) -// try { -// val results = table.get(getList) -// val rowList = mutable.MutableList[Row]() -// for (i <- 0 until results.length) { -// val rowArray = requiredColumns.map(c => -// DefaultSourceStaticUtils.getValue(c, schemaMappingDefinition, results(i))) -// rowList += Row.fromSeq(rowArray) -// } -// val getRDD = sqlContext.sparkContext.parallelize(rowList) -// if (resultRDD == null) resultRDD = getRDD -// else { -// resultRDD = resultRDD.union(getRDD) -// } -// } finally { -// table.close() -// } -// } finally { -// connection.close() -// } -// } -// } -// if (resultRDD == null) { -// val scan = new Scan() -// scan.setBatch(batchingNum) -// scan.setCaching(cachingNum) -// requiredQualifierDefinitionArray.foreach( d => -// scan.addColumn(d.columnFamilyBytes, d.qualifierBytes)) -// -// val rdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan).map(r => { -// Row.fromSeq(requiredColumns.map(c => DefaultSourceStaticUtils.getValue(c, -// serializableDefinitionMap, r._2))) -// }) -// resultRDD=rdd -// } -// resultRDD -// } -// -// /** -// * Root recursive function that will loop over the filters provided by -// * SparkSQL. Some filters are AND or OR functions and contain additional filters -// * hence the need for recursion. -// * -// * @param filters Filters provided by SparkSQL. -// * Filters are joined with the AND operater -// * @return A ColumnFilterCollection whish is a consolidated construct to -// * hold the high level filter information -// */ -// def buildColumnFilterCollection(filters: Array[Filter]): ColumnFilterCollection = { -// var superCollection: ColumnFilterCollection = null -// -// filters.foreach( f => { -// val parentCollection = new ColumnFilterCollection -// buildColumnFilterCollection(parentCollection, f) -// if (superCollection == null) -// superCollection = parentCollection -// else -// superCollection.mergeIntersect(parentCollection) -// }) -// superCollection -// } -// -// /** -// * Recursive function that will work to convert Spark Filter -// * objects to ColumnFilterCollection -// * -// * @param parentFilterCollection Parent ColumnFilterCollection -// * @param filter Current given filter from SparkSQL -// */ -// def buildColumnFilterCollection(parentFilterCollection:ColumnFilterCollection, -// filter:Filter): Unit = { -// filter match { -// -// case EqualTo(attr, value) => -// parentFilterCollection.mergeUnion(attr, -// new ColumnFilter(DefaultSourceStaticUtils.getByteValue(attr, -// schemaMappingDefinition, value.toString))) -// -// case LessThan(attr, value) => -// parentFilterCollection.mergeUnion(attr, new ColumnFilter(null, -// new ScanRange(DefaultSourceStaticUtils.getByteValue(attr, -// schemaMappingDefinition, value.toString), false, -// new Array[Byte](0), true))) -// -// case GreaterThan(attr, value) => -// parentFilterCollection.mergeUnion(attr, new ColumnFilter(null, -// new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr, -// schemaMappingDefinition, value.toString), false))) -// -// case LessThanOrEqual(attr, value) => -// parentFilterCollection.mergeUnion(attr, new ColumnFilter(null, -// new ScanRange(DefaultSourceStaticUtils.getByteValue(attr, -// schemaMappingDefinition, value.toString), true, -// new Array[Byte](0), true))) -// -// case GreaterThanOrEqual(attr, value) => -// parentFilterCollection.mergeUnion(attr, new ColumnFilter(null, -// new ScanRange(null, true, DefaultSourceStaticUtils.getByteValue(attr, -// schemaMappingDefinition, value.toString), true))) -// -// case Or(left, right) => -// buildColumnFilterCollection(parentFilterCollection, left) -// val rightSideCollection = new ColumnFilterCollection -// buildColumnFilterCollection(rightSideCollection, right) -// parentFilterCollection.mergeUnion(rightSideCollection) -// case And(left, right) => -// buildColumnFilterCollection(parentFilterCollection, left) -// val rightSideCollection = new ColumnFilterCollection -// buildColumnFilterCollection(rightSideCollection, right) -// parentFilterCollection.mergeIntersect(rightSideCollection) -// case _ => //nothing -// } -// } -//} -// -///** -// * Construct to contains column data that spend SparkSQL and HBase -// * -// * @param columnName SparkSQL column name -// * @param colType SparkSQL column type -// * @param columnFamily HBase column family -// * @param qualifier HBase qualifier name -// */ -//case class SchemaQualifierDefinition(columnName:String, -// colType:String, -// columnFamily:String, -// qualifier:String) extends Serializable { -// val columnFamilyBytes = Bytes.toBytes(columnFamily) -// val qualifierBytes = Bytes.toBytes(qualifier) -// val columnSparkSqlType:DataType = if (colType.equals("BOOLEAN")) BooleanType -// else if (colType.equals("TINYINT")) IntegerType -// else if (colType.equals("INT")) IntegerType -// else if (colType.equals("BIGINT")) LongType -// else if (colType.equals("FLOAT")) FloatType -// else if (colType.equals("DOUBLE")) DoubleType -// else if (colType.equals("STRING")) StringType -// else if (colType.equals("TIMESTAMP")) TimestampType -// else if (colType.equals("DECIMAL")) StringType //DataTypes.createDecimalType(precision, scale) -// else throw new IllegalArgumentException("Unsupported column type :" + colType) -//} -// -///** -// * Construct to contain a single scan ranges information. Also -// * provide functions to merge with other scan ranges through AND -// * or OR operators -// * -// * @param upperBound Upper bound of scan -// * @param isUpperBoundEqualTo Include upper bound value in the results -// * @param lowerBound Lower bound of scan -// * @param isLowerBoundEqualTo Include lower bound value in the results -// */ -//class ScanRange(var upperBound:Array[Byte], var isUpperBoundEqualTo:Boolean, -// var lowerBound:Array[Byte], var isLowerBoundEqualTo:Boolean) -// extends Serializable { -// -// /** -// * Function to merge another scan object through a AND operation -// * @param other Other scan object -// */ -// def mergeIntersect(other:ScanRange): Unit = { -// val upperBoundCompare = compareRange(upperBound, other.upperBound) -// val lowerBoundCompare = compareRange(lowerBound, other.lowerBound) -// -// upperBound = if (upperBoundCompare <0) upperBound else other.upperBound -// lowerBound = if (lowerBoundCompare >0) lowerBound else other.lowerBound -// -// isLowerBoundEqualTo = if (lowerBoundCompare == 0) -// isLowerBoundEqualTo && other.isLowerBoundEqualTo -// else isLowerBoundEqualTo -// -// isUpperBoundEqualTo = if (upperBoundCompare == 0) -// isUpperBoundEqualTo && other.isUpperBoundEqualTo -// else isUpperBoundEqualTo -// } -// -// /** -// * Function to merge another scan object through a OR operation -// * @param other Other scan object -// */ -// def mergeUnion(other:ScanRange): Unit = { -// -// val upperBoundCompare = compareRange(upperBound, other.upperBound) -// val lowerBoundCompare = compareRange(lowerBound, other.lowerBound) -// -// upperBound = if (upperBoundCompare >0) upperBound else other.upperBound -// lowerBound = if (lowerBoundCompare <0) lowerBound else other.lowerBound -// -// isLowerBoundEqualTo = if (lowerBoundCompare == 0) -// isLowerBoundEqualTo || other.isLowerBoundEqualTo -// else isLowerBoundEqualTo -// -// isUpperBoundEqualTo = if (upperBoundCompare == 0) -// isUpperBoundEqualTo || other.isUpperBoundEqualTo -// else isUpperBoundEqualTo -// } -// -// /** -// * Common function to see if this scan over laps with another -// * -// * Reference Visual -// * -// * A B -// * |---------------------------| -// * LL--------------LU -// * RL--------------RU -// * -// * A = lowest value is byte[0] -// * B = highest value is null -// * LL = Left Lower Bound -// * LU = Left Upper Bound -// * RL = Right Lower Bound -// * RU = Right Upper Bound -// * -// * @param other Other scan object -// * @return True is overlap false is not overlap -// */ -// def doesOverLap(other:ScanRange): Boolean = { -// -// var leftRange:ScanRange = null -// var rightRange:ScanRange = null -// -// //First identify the Left range -// // Also lower bound can't be null -// if (Bytes.compareTo(lowerBound, other.lowerBound) <=0) { -// leftRange = this -// rightRange = other -// } else { -// leftRange = other -// rightRange = this -// } -// -// //Then see if leftRange goes to null or if leftRange.upperBound -// // upper is greater or equals to rightRange.lowerBound -// leftRange.upperBound == null || -// Bytes.compareTo(leftRange.upperBound, rightRange.lowerBound) >= 0 -// } -// -// /** -// * Special compare logic because we can have null values -// * for left or right bound -// * -// * @param left Left byte array -// * @param right Right byte array -// * @return 0 for equals 1 is left is greater and -1 is right is greater -// */ -// def compareRange(left:Array[Byte], right:Array[Byte]): Int = { -// if (left == null && right == null) 0 -// else if (left == null && right != null) 1 -// else if (left != null && right == null) -1 -// else Bytes.compareTo(left, right) -// } -// override def toString:String = { -// "ScanRange:(" + Bytes.toString(upperBound) + "," + isUpperBoundEqualTo + "," + -// Bytes.toString(lowerBound) + "," + isLowerBoundEqualTo + ")" -// } -//} -// -///** -// * Contains information related to a filters for a given column. -// * This can contain many ranges or points. -// * -// * @param currentPoint the initial point when the filter is created -// * @param currentRange the initial scanRange when the filter is created -// */ -//class ColumnFilter (currentPoint:Array[Byte] = null, -// currentRange:ScanRange = null, -// var points:mutable.MutableList[Array[Byte]] = -// new mutable.MutableList[Array[Byte]](), -// var ranges:mutable.MutableList[ScanRange] = -// new mutable.MutableList[ScanRange]() ) extends Serializable { -// //Collection of ranges -// if (currentRange != null ) ranges.+=(currentRange) -// -// //Collection of points -// if (currentPoint != null) points.+=(currentPoint) -// -// /** -// * This will validate a give value through the filter's points and/or ranges -// * the result will be if the value passed the filter -// * -// * @param value Value to be validated -// * @param valueOffSet The offset of the value -// * @param valueLength The length of the value -// * @return True is the value passes the filter false if not -// */ -// def validate(value:Array[Byte], valueOffSet:Int, valueLength:Int):Boolean = { -// var result = false -// -// points.foreach( p => { -// if (Bytes.equals(p, 0, p.length, value, valueOffSet, valueLength)) { -// result = true -// } -// }) -// -// ranges.foreach( r => { -// val upperBoundPass = r.upperBound == null || -// (r.isUpperBoundEqualTo && -// Bytes.compareTo(r.upperBound, 0, r.upperBound.length, -// value, valueOffSet, valueLength) >= 0) || -// (!r.isUpperBoundEqualTo && -// Bytes.compareTo(r.upperBound, 0, r.upperBound.length, -// value, valueOffSet, valueLength) > 0) -// -// val lowerBoundPass = r.lowerBound == null || r.lowerBound.length == 0 -// (r.isLowerBoundEqualTo && -// Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length, -// value, valueOffSet, valueLength) <= 0) || -// (!r.isLowerBoundEqualTo && -// Bytes.compareTo(r.lowerBound, 0, r.lowerBound.length, -// value, valueOffSet, valueLength) < 0) -// -// result = result || (upperBoundPass && lowerBoundPass) -// }) -// result -// } -// -// /** -// * This will allow us to merge filter logic that is joined to the existing filter -// * through a OR operator -// * -// * @param other Filter to merge -// */ -// def mergeUnion(other:ColumnFilter): Unit = { -// other.points.foreach( p => points += p) -// -// other.ranges.foreach( otherR => { -// var doesOverLap = false -// ranges.foreach{ r => -// if (r.doesOverLap(otherR)) { -// r.mergeUnion(otherR) -// doesOverLap = true -// }} -// if (!doesOverLap) ranges.+=(otherR) -// }) -// } -// -// /** -// * This will allow us to merge filter logic that is joined to the existing filter -// * through a AND operator -// * -// * @param other Filter to merge -// */ -// def mergeIntersect(other:ColumnFilter): Unit = { -// val survivingPoints = new mutable.MutableList[Array[Byte]]() -// points.foreach( p => { -// other.points.foreach( otherP => { -// if (Bytes.equals(p, otherP)) { -// survivingPoints.+=(p) -// } -// }) -// }) -// points = survivingPoints -// -// val survivingRanges = new mutable.MutableList[ScanRange]() -// -// other.ranges.foreach( otherR => { -// ranges.foreach( r => { -// if (r.doesOverLap(otherR)) { -// r.mergeIntersect(otherR) -// survivingRanges += r -// } -// }) -// }) -// ranges = survivingRanges -// } -// -// override def toString:String = { -// val strBuilder = new StringBuilder -// strBuilder.append("(points:(") -// var isFirst = true -// points.foreach( p => { -// if (isFirst) isFirst = false -// else strBuilder.append(",") -// strBuilder.append(Bytes.toString(p)) -// }) -// strBuilder.append("),ranges:") -// isFirst = true -// ranges.foreach( r => { -// if (isFirst) isFirst = false -// else strBuilder.append(",") -// strBuilder.append(r) -// }) -// strBuilder.append("))") -// strBuilder.toString() -// } -//} -// -///** -// * A collection of ColumnFilters indexed by column names. -// * -// * Also contains merge commends that will consolidate the filters -// * per column name -// */ -//class ColumnFilterCollection { -// val columnFilterMap = new mutable.HashMap[String, ColumnFilter] -// -// def clear(): Unit = { -// columnFilterMap.clear() -// } -// -// /** -// * This will allow us to merge filter logic that is joined to the existing filter -// * through a OR operator. This will merge a single columns filter -// * -// * @param column The column to be merged -// * @param other The other ColumnFilter object to merge -// */ -// def mergeUnion(column:String, other:ColumnFilter): Unit = { -// val existingFilter = columnFilterMap.get(column) -// if (existingFilter.isEmpty) { -// columnFilterMap.+=((column, other)) -// } else { -// existingFilter.get.mergeUnion(other) -// } -// } -// -// /** -// * This will allow us to merge all filters in the existing collection -// * to the filters in the other collection. All merges are done as a result -// * of a OR operator -// * -// * @param other The other Column Filter Collection to be merged -// */ -// def mergeUnion(other:ColumnFilterCollection): Unit = { -// other.columnFilterMap.foreach( e => { -// mergeUnion(e._1, e._2) -// }) -// } -// -// /** -// * This will allow us to merge all filters in the existing collection -// * to the filters in the other collection. All merges are done as a result -// * of a AND operator -// * -// * @param other The column filter from the other collection -// */ -// def mergeIntersect(other:ColumnFilterCollection): Unit = { -// other.columnFilterMap.foreach( e => { -// val existingColumnFilter = columnFilterMap.get(e._1) -// if (existingColumnFilter.isEmpty) { -// columnFilterMap += e -// } else { -// existingColumnFilter.get.mergeIntersect(e._2) -// } -// }) -// } -// -// /** -// * This will collect all the filter information in a way that is optimized -// * for the HBase filter commend. Allowing the filter to be accessed -// * with columnFamily and qualifier information -// * -// * @param schemaDefinitionMap Schema Map that will help us map the right filters -// * to the correct columns -// * @return HashMap oc column filters -// */ -// def generateFamilyQualifiterFilterMap(schemaDefinitionMap: -// java.util.HashMap[String, -// SchemaQualifierDefinition]): -// util.HashMap[ColumnFamilyQualifierMapKeyWrapper, ColumnFilter] = { -// val familyQualifierFilterMap = -// new util.HashMap[ColumnFamilyQualifierMapKeyWrapper, ColumnFilter]() -// -// columnFilterMap.foreach( e => { -// val definition = schemaDefinitionMap.get(e._1) -// //Don't add rowKeyFilter -// if (definition.columnFamilyBytes.size > 0) { -// familyQualifierFilterMap.put( -// new ColumnFamilyQualifierMapKeyWrapper( -// definition.columnFamilyBytes, 0, definition.columnFamilyBytes.length, -// definition.qualifierBytes, 0, definition.qualifierBytes.length), e._2) -// } -// }) -// familyQualifierFilterMap -// } -// -// override def toString:String = { -// val strBuilder = new StringBuilder -// columnFilterMap.foreach( e => strBuilder.append(e)) -// strBuilder.toString() -// } -//} -// -///** -// * Status object to store static functions but also to hold last executed -// * information that can be used for unit testing. -// */ -//object DefaultSourceStaticUtils { -// -// val rawInteger = new RawInteger -// val rawLong = new RawLong -// val rawFloat = new RawFloat -// val rawDouble = new RawDouble -// val rawString = RawString.ASCENDING -// -// val byteRange = new ThreadLocal[PositionedByteRange]{ -// override def initialValue(): PositionedByteRange = { -// val range = new SimplePositionedMutableByteRange() -// range.setOffset(0) -// range.setPosition(0) -// } -// } -// -// def getFreshByteRange(bytes:Array[Byte]): PositionedByteRange = { -// getFreshByteRange(bytes, 0, bytes.length) -// } -// -// def getFreshByteRange(bytes:Array[Byte], offset:Int = 0, length:Int): PositionedByteRange = { -// byteRange.get().set(bytes).setLength(length).setOffset(offset) -// } -// -// //This will contain the last 5 filters and required fields used in buildScan -// // These values can be used in unit testing to make sure we are converting -// // The Spark SQL input correctly -// val lastFiveExecutionRules = -// new ConcurrentLinkedQueue[ExecutionRuleForUnitTesting]() -// -// /** -// * This method is to populate the lastFiveExecutionRules for unit test perposes -// * This method is not thread safe. -// * -// * @param columnFilterCollection The filters in the last job -// * @param requiredQualifierDefinitionArray The required columns in the last job -// */ -// def populateLatestExecutionRules(columnFilterCollection: ColumnFilterCollection, -// requiredQualifierDefinitionArray: -// mutable.MutableList[SchemaQualifierDefinition]):Unit = { -// lastFiveExecutionRules.add(new ExecutionRuleForUnitTesting( -// columnFilterCollection, requiredQualifierDefinitionArray)) -// while (lastFiveExecutionRules.size() > 5) { -// lastFiveExecutionRules.poll() -// } -// } -// -// /** -// * This method will convert the result content from HBase into the -// * SQL value type that is requested by the Spark SQL schema definition -// * -// * @param columnName The name of the SparkSQL Column -// * @param schemaMappingDefinition The schema definition map -// * @param r The result object from HBase -// * @return The converted object type -// */ -// def getValue(columnName: String, -// schemaMappingDefinition: -// java.util.HashMap[String, SchemaQualifierDefinition], -// r: Result): Any = { -// -// val columnDef = schemaMappingDefinition.get(columnName) -// -// if (columnDef == null) throw new IllegalArgumentException("Unknown column:" + columnName) -// -// -// if (columnDef.columnFamilyBytes.isEmpty) { -// val row = r.getRow -// -// columnDef.columnSparkSqlType match { -// case IntegerType => rawInteger.decode(getFreshByteRange(row)) -// case LongType => rawLong.decode(getFreshByteRange(row)) -// case FloatType => rawFloat.decode(getFreshByteRange(row)) -// case DoubleType => rawDouble.decode(getFreshByteRange(row)) -// case StringType => rawString.decode(getFreshByteRange(row)) -// case TimestampType => rawLong.decode(getFreshByteRange(row)) -// case _ => Bytes.toString(row) -// } -// } else { -// val cellByteValue = -// r.getColumnLatestCell(columnDef.columnFamilyBytes, columnDef.qualifierBytes) -// if (cellByteValue == null) null -// else columnDef.columnSparkSqlType match { -// case IntegerType => rawInteger.decode(getFreshByteRange(cellByteValue.getValueArray, -// cellByteValue.getValueOffset, cellByteValue.getValueLength)) -// case LongType => rawLong.decode(getFreshByteRange(cellByteValue.getValueArray, -// cellByteValue.getValueOffset, cellByteValue.getValueLength)) -// case FloatType => rawFloat.decode(getFreshByteRange(cellByteValue.getValueArray, -// cellByteValue.getValueOffset, cellByteValue.getValueLength)) -// case DoubleType => rawDouble.decode(getFreshByteRange(cellByteValue.getValueArray, -// cellByteValue.getValueOffset, cellByteValue.getValueLength)) -// case StringType => Bytes.toString(cellByteValue.getValueArray, -// cellByteValue.getValueOffset, cellByteValue.getValueLength) -// case TimestampType => rawLong.decode(getFreshByteRange(cellByteValue.getValueArray, -// cellByteValue.getValueOffset, cellByteValue.getValueLength)) -// case _ => Bytes.toString(cellByteValue.getValueArray, -// cellByteValue.getValueOffset, cellByteValue.getValueLength) -// } -// } -// } -// -// /** -// * This will convert the value from SparkSQL to be stored into HBase using the -// * right byte Type -// * -// * @param columnName SparkSQL column name -// * @param schemaMappingDefinition Schema definition map -// * @param value String value from SparkSQL -// * @return Returns the byte array to go into HBase -// */ -// def getByteValue(columnName: String, -// schemaMappingDefinition: -// java.util.HashMap[String, SchemaQualifierDefinition], -// value: String): Array[Byte] = { -// -// val columnDef = schemaMappingDefinition.get(columnName) -// -// if (columnDef == null) { -// throw new IllegalArgumentException("Unknown column:" + columnName) -// } else { -// columnDef.columnSparkSqlType match { -// case IntegerType => -// val result = new Array[Byte](Bytes.SIZEOF_INT) -// val localDataRange = getFreshByteRange(result) -// rawInteger.encode(localDataRange, value.toInt) -// localDataRange.getBytes -// case LongType => -// val result = new Array[Byte](Bytes.SIZEOF_LONG) -// val localDataRange = getFreshByteRange(result) -// rawLong.encode(localDataRange, value.toLong) -// localDataRange.getBytes -// case FloatType => -// val result = new Array[Byte](Bytes.SIZEOF_FLOAT) -// val localDataRange = getFreshByteRange(result) -// rawFloat.encode(localDataRange, value.toFloat) -// localDataRange.getBytes -// case DoubleType => -// val result = new Array[Byte](Bytes.SIZEOF_DOUBLE) -// val localDataRange = getFreshByteRange(result) -// rawDouble.encode(localDataRange, value.toDouble) -// localDataRange.getBytes -// case StringType => -// Bytes.toBytes(value) -// case TimestampType => -// val result = new Array[Byte](Bytes.SIZEOF_LONG) -// val localDataRange = getFreshByteRange(result) -// rawLong.encode(localDataRange, value.toLong) -// localDataRange.getBytes -// -// case _ => Bytes.toBytes(value) -// } -// } -// } -//} -// -//class ExecutionRuleForUnitTesting(val columnFilterCollection: ColumnFilterCollection, -// val requiredQualifierDefinitionArray: -// mutable.MutableList[SchemaQualifierDefinition])
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/main/scala/spark/FamilyHFileWriteOptions.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/spark/FamilyHFileWriteOptions.scala b/loader/src/main/scala/spark/FamilyHFileWriteOptions.scala deleted file mode 100644 index 2e4023c..0000000 --- a/loader/src/main/scala/spark/FamilyHFileWriteOptions.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark - -import java.io.Serializable - -/** - * This object will hold optional data for how a given column family's - * writer will work - * - * @param compression String to define the Compression to be used in the HFile - * @param bloomType String to define the bloom type to be used in the HFile - * @param blockSize The block size to be used in the HFile - * @param dataBlockEncoding String to define the data block encoding to be used - * in the HFile - */ -class FamilyHFileWriteOptions( val compression:String, - val bloomType: String, - val blockSize: Int, - val dataBlockEncoding: String) extends Serializable http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/main/scala/spark/HBaseContext.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/spark/HBaseContext.scala b/loader/src/main/scala/spark/HBaseContext.scala deleted file mode 100644 index 4ea58f1..0000000 --- a/loader/src/main/scala/spark/HBaseContext.scala +++ /dev/null @@ -1,849 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark - -import java.net.InetSocketAddress -import java.util - -import org.apache.hadoop.hbase.fs.HFileSystem -import org.apache.hadoop.hbase._ -import org.apache.hadoop.hbase.io.compress.Compression -import org.apache.hadoop.hbase.io.compress.Compression.Algorithm -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding -import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFileContextBuilder} -import org.apache.hadoop.hbase.regionserver.{HStore, StoreFile, BloomType} -import org.apache.hadoop.hbase.util.Bytes -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.rdd.RDD -import org.apache.hadoop.conf.Configuration -import spark.HBaseRDDFunctions._ -import org.apache.hadoop.hbase.client._ -import scala.reflect.ClassTag -import org.apache.spark.{Logging, SerializableWritable, SparkContext} -import org.apache.hadoop.hbase.mapreduce.{TableMapReduceUtil, -TableInputFormat, IdentityTableMapper} -import org.apache.hadoop.hbase.io.ImmutableBytesWritable -import org.apache.hadoop.mapreduce.Job -import org.apache.spark.streaming.dstream.DStream -import java.io._ -import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod -import org.apache.hadoop.fs.{Path, FileSystem} -import scala.collection.mutable - -/** - * HBaseContext is a façade for HBase operations - * like bulk put, get, increment, delete, and scan - * - * HBaseContext will take the responsibilities - * of disseminating the configuration information - * to the working and managing the life cycle of HConnections. - */ -class HBaseContext(@transient sc: SparkContext, - @transient config: Configuration, - val tmpHdfsConfgFile: String = null) - extends Serializable with Logging { - - @transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials() - @transient var tmpHdfsConfiguration:Configuration = config - @transient var appliedCredentials = false - @transient val job = Job.getInstance(config) - TableMapReduceUtil.initCredentials(job) - val broadcastedConf = sc.broadcast(new SerializableWritable(config)) - val credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials)) - - LatestHBaseContextCache.latest = this - - if (tmpHdfsConfgFile != null && config != null) { - val fs = FileSystem.newInstance(config) - val tmpPath = new Path(tmpHdfsConfgFile) - if (!fs.exists(tmpPath)) { - val outputStream = fs.create(tmpPath) - config.write(outputStream) - outputStream.close() - } else { - logWarning("tmpHdfsConfigDir " + tmpHdfsConfgFile + " exist!!") - } - } - - /** - * A simple enrichment of the traditional Spark RDD foreachPartition. - * This function differs from the original in that it offers the - * developer access to a already connected HConnection object - * - * Note: Do not close the HConnection object. All HConnection - * management is handled outside this method - * - * @param rdd Original RDD with data to iterate over - * @param f Function to be given a iterator to iterate through - * the RDD values and a HConnection object to interact - * with HBase - */ - def foreachPartition[T](rdd: RDD[T], - f: (Iterator[T], Connection) => Unit):Unit = { - rdd.foreachPartition( - it => hbaseForeachPartition(broadcastedConf, it, f)) - } - - /** - * A simple enrichment of the traditional Spark Streaming dStream foreach - * This function differs from the original in that it offers the - * developer access to a already connected HConnection object - * - * Note: Do not close the HConnection object. All HConnection - * management is handled outside this method - * - * @param dstream Original DStream with data to iterate over - * @param f Function to be given a iterator to iterate through - * the DStream values and a HConnection object to - * interact with HBase - */ - def foreachPartition[T](dstream: DStream[T], - f: (Iterator[T], Connection) => Unit):Unit = { - dstream.foreachRDD((rdd, time) => { - foreachPartition(rdd, f) - }) - } - - /** - * A simple enrichment of the traditional Spark RDD mapPartition. - * This function differs from the original in that it offers the - * developer access to a already connected HConnection object - * - * Note: Do not close the HConnection object. All HConnection - * management is handled outside this method - * - * @param rdd Original RDD with data to iterate over - * @param mp Function to be given a iterator to iterate through - * the RDD values and a HConnection object to interact - * with HBase - * @return Returns a new RDD generated by the user definition - * function just like normal mapPartition - */ - def mapPartitions[T, R: ClassTag](rdd: RDD[T], - mp: (Iterator[T], Connection) => Iterator[R]): RDD[R] = { - - rdd.mapPartitions[R](it => hbaseMapPartition[T, R](broadcastedConf, - it, - mp)) - - } - - /** - * A simple enrichment of the traditional Spark Streaming DStream - * foreachPartition. - * - * This function differs from the original in that it offers the - * developer access to a already connected HConnection object - * - * Note: Do not close the HConnection object. All HConnection - * management is handled outside this method - * - * Note: Make sure to partition correctly to avoid memory issue when - * getting data from HBase - * - * @param dstream Original DStream with data to iterate over - * @param f Function to be given a iterator to iterate through - * the DStream values and a HConnection object to - * interact with HBase - * @return Returns a new DStream generated by the user - * definition function just like normal mapPartition - */ - def streamForeachPartition[T](dstream: DStream[T], - f: (Iterator[T], Connection) => Unit): Unit = { - - dstream.foreachRDD(rdd => this.foreachPartition(rdd, f)) - } - - /** - * A simple enrichment of the traditional Spark Streaming DStream - * mapPartition. - * - * This function differs from the original in that it offers the - * developer access to a already connected HConnection object - * - * Note: Do not close the HConnection object. All HConnection - * management is handled outside this method - * - * Note: Make sure to partition correctly to avoid memory issue when - * getting data from HBase - * - * @param dstream Original DStream with data to iterate over - * @param f Function to be given a iterator to iterate through - * the DStream values and a HConnection object to - * interact with HBase - * @return Returns a new DStream generated by the user - * definition function just like normal mapPartition - */ - def streamMapPartitions[T, U: ClassTag](dstream: DStream[T], - f: (Iterator[T], Connection) => Iterator[U]): - DStream[U] = { - dstream.mapPartitions(it => hbaseMapPartition[T, U]( - broadcastedConf, - it, - f)) - } - - /** - * A simple abstraction over the HBaseContext.foreachPartition method. - * - * It allow addition support for a user to take RDD - * and generate puts and send them to HBase. - * The complexity of managing the HConnection is - * removed from the developer - * - * @param rdd Original RDD with data to iterate over - * @param tableName The name of the table to put into - * @param f Function to convert a value in the RDD to a HBase Put - */ - def bulkPut[T](rdd: RDD[T], tableName: TableName, f: (T) => Put) { - - val tName = tableName.getName - rdd.foreachPartition( - it => hbaseForeachPartition[T]( - broadcastedConf, - it, - (iterator, connection) => { - val m = connection.getBufferedMutator(TableName.valueOf(tName)) - iterator.foreach(T => m.mutate(f(T))) - m.flush() - m.close() - })) - } - - def applyCreds[T] (configBroadcast: Broadcast[SerializableWritable[Configuration]]){ - credentials = SparkHadoopUtil.get.getCurrentUserCredentials() - - logDebug("appliedCredentials:" + appliedCredentials + ",credentials:" + credentials) - - if (!appliedCredentials && credentials != null) { - appliedCredentials = true - - @transient val ugi = UserGroupInformation.getCurrentUser - ugi.addCredentials(credentials) - // specify that this is a proxy user - ugi.setAuthenticationMethod(AuthenticationMethod.PROXY) - - ugi.addCredentials(credentialsConf.value.value) - } - } - - /** - * A simple abstraction over the HBaseContext.streamMapPartition method. - * - * It allow addition support for a user to take a DStream and - * generate puts and send them to HBase. - * - * The complexity of managing the HConnection is - * removed from the developer - * - * @param dstream Original DStream with data to iterate over - * @param tableName The name of the table to put into - * @param f Function to convert a value in - * the DStream to a HBase Put - */ - def streamBulkPut[T](dstream: DStream[T], - tableName: TableName, - f: (T) => Put) = { - val tName = tableName.getName - dstream.foreachRDD((rdd, time) => { - bulkPut(rdd, TableName.valueOf(tName), f) - }) - } - - /** - * A simple abstraction over the HBaseContext.foreachPartition method. - * - * It allow addition support for a user to take a RDD and generate delete - * and send them to HBase. The complexity of managing the HConnection is - * removed from the developer - * - * @param rdd Original RDD with data to iterate over - * @param tableName The name of the table to delete from - * @param f Function to convert a value in the RDD to a - * HBase Deletes - * @param batchSize The number of delete to batch before sending to HBase - */ - def bulkDelete[T](rdd: RDD[T], tableName: TableName, - f: (T) => Delete, batchSize: Integer) { - bulkMutation(rdd, tableName, f, batchSize) - } - - /** - * A simple abstraction over the HBaseContext.streamBulkMutation method. - * - * It allow addition support for a user to take a DStream and - * generate Delete and send them to HBase. - * - * The complexity of managing the HConnection is - * removed from the developer - * - * @param dstream Original DStream with data to iterate over - * @param tableName The name of the table to delete from - * @param f function to convert a value in the DStream to a - * HBase Delete - * @param batchSize The number of deletes to batch before sending to HBase - */ - def streamBulkDelete[T](dstream: DStream[T], - tableName: TableName, - f: (T) => Delete, - batchSize: Integer) = { - streamBulkMutation(dstream, tableName, f, batchSize) - } - - /** - * Under lining function to support all bulk mutations - * - * May be opened up if requested - */ - private def bulkMutation[T](rdd: RDD[T], tableName: TableName, - f: (T) => Mutation, batchSize: Integer) { - - val tName = tableName.getName - rdd.foreachPartition( - it => hbaseForeachPartition[T]( - broadcastedConf, - it, - (iterator, connection) => { - val table = connection.getTable(TableName.valueOf(tName)) - val mutationList = new java.util.ArrayList[Mutation] - iterator.foreach(T => { - mutationList.add(f(T)) - if (mutationList.size >= batchSize) { - table.batch(mutationList, null) - mutationList.clear() - } - }) - if (mutationList.size() > 0) { - table.batch(mutationList, null) - mutationList.clear() - } - table.close() - })) - } - - /** - * Under lining function to support all bulk streaming mutations - * - * May be opened up if requested - */ - private def streamBulkMutation[T](dstream: DStream[T], - tableName: TableName, - f: (T) => Mutation, - batchSize: Integer) = { - val tName = tableName.getName - dstream.foreachRDD((rdd, time) => { - bulkMutation(rdd, TableName.valueOf(tName), f, batchSize) - }) - } - - /** - * A simple abstraction over the HBaseContext.mapPartition method. - * - * It allow addition support for a user to take a RDD and generates a - * new RDD based on Gets and the results they bring back from HBase - * - * @param rdd Original RDD with data to iterate over - * @param tableName The name of the table to get from - * @param makeGet function to convert a value in the RDD to a - * HBase Get - * @param convertResult This will convert the HBase Result object to - * what ever the user wants to put in the resulting - * RDD - * return new RDD that is created by the Get to HBase - */ - def bulkGet[T, U: ClassTag](tableName: TableName, - batchSize: Integer, - rdd: RDD[T], - makeGet: (T) => Get, - convertResult: (Result) => U): RDD[U] = { - - val getMapPartition = new GetMapPartition(tableName, - batchSize, - makeGet, - convertResult) - - rdd.mapPartitions[U](it => - hbaseMapPartition[T, U]( - broadcastedConf, - it, - getMapPartition.run)) - } - - /** - * A simple abstraction over the HBaseContext.streamMap method. - * - * It allow addition support for a user to take a DStream and - * generates a new DStream based on Gets and the results - * they bring back from HBase - * - * @param tableName The name of the table to get from - * @param batchSize The number of Gets to be sent in a single batch - * @param dStream Original DStream with data to iterate over - * @param makeGet Function to convert a value in the DStream to a - * HBase Get - * @param convertResult This will convert the HBase Result object to - * what ever the user wants to put in the resulting - * DStream - * @return A new DStream that is created by the Get to HBase - */ - def streamBulkGet[T, U: ClassTag](tableName: TableName, - batchSize: Integer, - dStream: DStream[T], - makeGet: (T) => Get, - convertResult: (Result) => U): DStream[U] = { - - val getMapPartition = new GetMapPartition(tableName, - batchSize, - makeGet, - convertResult) - - dStream.mapPartitions[U](it => hbaseMapPartition[T, U]( - broadcastedConf, - it, - getMapPartition.run)) - } - - /** - * This function will use the native HBase TableInputFormat with the - * given scan object to generate a new RDD - * - * @param tableName the name of the table to scan - * @param scan the HBase scan object to use to read data from HBase - * @param f function to convert a Result object from HBase into - * what the user wants in the final generated RDD - * @return new RDD with results from scan - */ - def hbaseRDD[U: ClassTag](tableName: TableName, scan: Scan, - f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = { - - val job: Job = Job.getInstance(getConf(broadcastedConf)) - - TableMapReduceUtil.initCredentials(job) - TableMapReduceUtil.initTableMapperJob(tableName, scan, - classOf[IdentityTableMapper], null, null, job) - - sc.newAPIHadoopRDD(job.getConfiguration, - classOf[TableInputFormat], - classOf[ImmutableBytesWritable], - classOf[Result]).map(f) - } - - /** - * A overloaded version of HBaseContext hbaseRDD that defines the - * type of the resulting RDD - * - * @param tableName the name of the table to scan - * @param scans the HBase scan object to use to read data from HBase - * @return New RDD with results from scan - * - */ - def hbaseRDD(tableName: TableName, scans: Scan): - RDD[(ImmutableBytesWritable, Result)] = { - - hbaseRDD[(ImmutableBytesWritable, Result)]( - tableName, - scans, - (r: (ImmutableBytesWritable, Result)) => r) - } - - /** - * underlining wrapper all foreach functions in HBaseContext - */ - private def hbaseForeachPartition[T](configBroadcast: - Broadcast[SerializableWritable[Configuration]], - it: Iterator[T], - f: (Iterator[T], Connection) => Unit) = { - - val config = getConf(configBroadcast) - - applyCreds(configBroadcast) - // specify that this is a proxy user - val connection = ConnectionFactory.createConnection(config) - f(it, connection) - connection.close() - } - - private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]): - Configuration = { - - if (tmpHdfsConfiguration == null && tmpHdfsConfgFile != null) { - val fs = FileSystem.newInstance(SparkHadoopUtil.get.conf) - val inputStream = fs.open(new Path(tmpHdfsConfgFile)) - tmpHdfsConfiguration = new Configuration(false) - tmpHdfsConfiguration.readFields(inputStream) - inputStream.close() - } - - if (tmpHdfsConfiguration == null) { - try { - tmpHdfsConfiguration = configBroadcast.value.value - } catch { - case ex: Exception => logError("Unable to getConfig from broadcast", ex) - } - } - tmpHdfsConfiguration - } - - /** - * underlining wrapper all mapPartition functions in HBaseContext - * - */ - private def hbaseMapPartition[K, U]( - configBroadcast: - Broadcast[SerializableWritable[Configuration]], - it: Iterator[K], - mp: (Iterator[K], Connection) => - Iterator[U]): Iterator[U] = { - - val config = getConf(configBroadcast) - applyCreds(configBroadcast) - - val connection = ConnectionFactory.createConnection(config) - val res = mp(it, connection) - connection.close() - res - - } - - /** - * underlining wrapper all get mapPartition functions in HBaseContext - */ - private class GetMapPartition[T, U](tableName: TableName, - batchSize: Integer, - makeGet: (T) => Get, - convertResult: (Result) => U) - extends Serializable { - - val tName = tableName.getName - - def run(iterator: Iterator[T], connection: Connection): Iterator[U] = { - val table = connection.getTable(TableName.valueOf(tName)) - - val gets = new java.util.ArrayList[Get]() - var res = List[U]() - - while (iterator.hasNext) { - gets.add(makeGet(iterator.next())) - - if (gets.size() == batchSize) { - val results = table.get(gets) - res = res ++ results.map(convertResult) - gets.clear() - } - } - if (gets.size() > 0) { - val results = table.get(gets) - res = res ++ results.map(convertResult) - gets.clear() - } - table.close() - res.iterator - } - } - - /** - * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef]. - * - * This method is used to keep ClassTags out of the external Java API, as - * the Java compiler cannot produce them automatically. While this - * ClassTag-faking does please the compiler, it can cause problems at runtime - * if the Scala API relies on ClassTags for correctness. - * - * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior, - * just worse performance or security issues. - * For instance, an Array of AnyRef can hold any type T, but may lose primitive - * specialization. - */ - private[spark] - def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] - - /** - * A Spark Implementation of HBase Bulk load - * - * This will take the content from an existing RDD then sort and shuffle - * it with respect to region splits. The result of that sort and shuffle - * will be written to HFiles. - * - * After this function is executed the user will have to call - * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase - * - * Also note this version of bulk load is different from past versions in - * that it includes the qualifier as part of the sort process. The - * reason for this is to be able to support rows will very large number - * of columns. - * - * @param rdd The RDD we are bulk loading from - * @param tableName The HBase table we are loading into - * @param flatMap A flapMap function that will make every - * row in the RDD - * into N cells for the bulk load - * @param stagingDir The location on the FileSystem to bulk load into - * @param familyHFileWriteOptionsMap Options that will define how the HFile for a - * column family is written - * @param compactionExclude Compaction excluded for the HFiles - * @param maxSize Max size for the HFiles before they roll - * @tparam T The Type of values in the original RDD - */ - def bulkLoad[T](rdd:RDD[T], - tableName: TableName, - flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])], - stagingDir:String, - familyHFileWriteOptionsMap: - util.Map[Array[Byte], FamilyHFileWriteOptions] = - new util.HashMap[Array[Byte], FamilyHFileWriteOptions], - compactionExclude: Boolean = false, - maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE): - Unit = { - val conn = ConnectionFactory.createConnection(config) - val regionLocator = conn.getRegionLocator(tableName) - val startKeys = regionLocator.getStartKeys - val defaultCompressionStr = config.get("hfile.compression", - Compression.Algorithm.NONE.getName) - val defaultCompression = Compression.getCompressionAlgorithmByName(defaultCompressionStr) -// HFileWriterImpl -// .compressionByName(defaultCompressionStr) - val now = System.currentTimeMillis() - val tableNameByteArray = tableName.getName - - val familyHFileWriteOptionsMapInternal = - new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions] - - val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator() - - while (entrySetIt.hasNext) { - val entry = entrySetIt.next() - familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue) - } - - /** - * This will return a new HFile writer when requested - * - * @param family column family - * @param conf configuration to connect to HBase - * @param favoredNodes nodes that we would like to write too - * @param fs FileSystem object where we will be writing the HFiles to - * @return WriterLength object - */ - def getNewWriter(family: Array[Byte], conf: Configuration, - favoredNodes: Array[InetSocketAddress], - fs:FileSystem, - familydir:Path): WriterLength = { - - - var familyOptions = familyHFileWriteOptionsMapInternal.get(new ByteArrayWrapper(family)) - - if (familyOptions == null) { - familyOptions = new FamilyHFileWriteOptions(defaultCompression.toString, - BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, DataBlockEncoding.NONE.toString) - familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), familyOptions) - } - - val tempConf = new Configuration(conf) - tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f) - val contextBuilder = new HFileContextBuilder() - .withCompression(Algorithm.valueOf(familyOptions.compression)) - .withChecksumType(HStore.getChecksumType(conf)) - .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) - .withBlockSize(familyOptions.blockSize) - contextBuilder.withDataBlockEncoding(DataBlockEncoding. - valueOf(familyOptions.dataBlockEncoding)) - val hFileContext = contextBuilder.build() - - if (null == favoredNodes) { - new WriterLength(0, new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs) - .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType)) - .withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext).build()) -// .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build()) - } else { - new WriterLength(0, - new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) - .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType)) - .withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext) -// .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) - .withFavoredNodes(favoredNodes).build()) - } - } - - val regionSplitPartitioner = - new BulkLoadPartitioner(startKeys) - - //This is where all the magic happens - //Here we are going to do the following things - // 1. FlapMap every row in the RDD into key column value tuples - // 2. Then we are going to repartition sort and shuffle - // 3. Finally we are going to write out our HFiles - rdd.flatMap( r => flatMap(r)). - repartitionAndSortWithinPartitions(regionSplitPartitioner). - hbaseForeachPartition(this, (it, conn) => { - - val conf = broadcastedConf.value.value - val fs = FileSystem.get(conf) - val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength] - var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY - var rollOverRequested = false - - /** - * This will roll all writers - */ - def rollWriters(): Unit = { - writerMap.values.foreach( wl => { - if (wl.writer != null) { - logDebug("Writer=" + wl.writer.getPath + - (if (wl.written == 0) "" else ", wrote=" + wl.written)) - close(wl.writer) - } - }) - writerMap.clear() - rollOverRequested = false - } - - /** - * This function will close a given HFile writer - * @param w The writer to close - */ - def close(w:StoreFile.Writer): Unit = { - if (w != null) { - w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, - Bytes.toBytes(System.currentTimeMillis())) - w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, - Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow))) - w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, - Bytes.toBytes(true)) - w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, - Bytes.toBytes(compactionExclude)) - w.appendTrackedTimestampsToMetadata() - w.close() - } - } - - //Here is where we finally iterate through the data in this partition of the - //RDD that has been sorted and partitioned - it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) => - - //This will get a writer for the column family - //If there is no writer for a given column family then - //it will get created here. - val wl = writerMap.getOrElseUpdate(new ByteArrayWrapper(keyFamilyQualifier.family), { - - val familyDir = new Path(stagingDir, Bytes.toString(keyFamilyQualifier.family)) - - fs.mkdirs(familyDir) - - val loc:HRegionLocation = { - try { - val locator = - conn.getRegionLocator(TableName.valueOf(tableNameByteArray)) - locator.getRegionLocation(keyFamilyQualifier.rowKey) - } catch { - case e: Throwable => - logWarning("there's something wrong when locating rowkey: " + - Bytes.toString(keyFamilyQualifier.rowKey)) - null - } - } - if (null == loc) { - if (log.isTraceEnabled) { - logTrace("failed to get region location, so use default writer: " + - Bytes.toString(keyFamilyQualifier.rowKey)) - } - getNewWriter(family = keyFamilyQualifier.family, conf = conf, favoredNodes = null, - fs = fs, familydir = familyDir) - } else { - if (log.isDebugEnabled) { - logDebug("first rowkey: [" + Bytes.toString(keyFamilyQualifier.rowKey) + "]") - } - val initialIsa = - new InetSocketAddress(loc.getHostname, loc.getPort) - if (initialIsa.isUnresolved) { - if (log.isTraceEnabled) { - logTrace("failed to resolve bind address: " + loc.getHostname + ":" - + loc.getPort + ", so use default writer") - } - getNewWriter(keyFamilyQualifier.family, conf, null, fs, familyDir) - } else { - if(log.isDebugEnabled) { - logDebug("use favored nodes writer: " + initialIsa.getHostString) - } - getNewWriter(keyFamilyQualifier.family, conf, - Array[InetSocketAddress](initialIsa), fs, familyDir) - } - } - }) - - val keyValue =new KeyValue(keyFamilyQualifier.rowKey, - keyFamilyQualifier.family, - keyFamilyQualifier.qualifier, - now,cellValue) - - wl.writer.append(keyValue) - wl.written += keyValue.getLength - - rollOverRequested = rollOverRequested || wl.written > maxSize - - //This will only roll if we have at least one column family file that is - //bigger then maxSize and we have finished a given row key - if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) { - rollWriters() - } - - previousRow = keyFamilyQualifier.rowKey - } - //We have finished all the data so lets close up the writers - rollWriters() - }) - } - - /** - * This is a wrapper class around StoreFile.Writer. The reason for the - * wrapper is to keep the length of the file along side the writer - * - * @param written The writer to be wrapped - * @param writer The number of bytes written to the writer - */ - class WriterLength(var written:Long, val writer:StoreFile.Writer) - - /** - * This is a wrapper over a byte array so it can work as - * a key in a hashMap - * - * @param o1 The Byte Array value - */ - class ByteArrayWrapper (val o1:Array[Byte]) - extends Comparable[ByteArrayWrapper] with Serializable { - override def compareTo(o2: ByteArrayWrapper): Int = { - Bytes.compareTo(o1,o2.o1) - } - override def equals(o2: Any): Boolean = { - o2 match { - case wrapper: ByteArrayWrapper => - Bytes.equals(o1, wrapper.o1) - case _ => - false - } - } - override def hashCode():Int = { - Bytes.hashCode(o1) - } - } -} - -object LatestHBaseContextCache { - var latest:HBaseContext = null -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/main/scala/spark/HBaseDStreamFunctions.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/spark/HBaseDStreamFunctions.scala b/loader/src/main/scala/spark/HBaseDStreamFunctions.scala deleted file mode 100644 index fc45865..0000000 --- a/loader/src/main/scala/spark/HBaseDStreamFunctions.scala +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package spark - -import org.apache.hadoop.hbase.TableName -import org.apache.hadoop.hbase.client._ -import org.apache.hadoop.hbase.io.ImmutableBytesWritable -import org.apache.spark.streaming.dstream.DStream - -import scala.reflect.ClassTag - -/** - * HBaseDStreamFunctions contains a set of implicit functions that can be - * applied to a Spark DStream so that we can easily interact with HBase - */ -object HBaseDStreamFunctions { - - /** - * These are implicit methods for a DStream that contains any type of - * data. - * - * @param dStream This is for dStreams of any type - * @tparam T Type T - */ - implicit class GenericHBaseDStreamFunctions[T](val dStream: DStream[T]) { - - /** - * Implicit method that gives easy access to HBaseContext's bulk - * put. This will not return a new Stream. Think of it like a foreach - * - * @param hc The hbaseContext object to identify which - * HBase cluster connection to use - * @param tableName The tableName that the put will be sent to - * @param f The function that will turn the DStream values - * into HBase Put objects. - */ - def hbaseBulkPut(hc: HBaseContext, - tableName: TableName, - f: (T) => Put): Unit = { - hc.streamBulkPut(dStream, tableName, f) - } - - /** - * Implicit method that gives easy access to HBaseContext's bulk - * get. This will return a new DStream. Think about it as a DStream map - * function. In that every DStream value will get a new value out of - * HBase. That new value will populate the newly generated DStream. - * - * @param hc The hbaseContext object to identify which - * HBase cluster connection to use - * @param tableName The tableName that the put will be sent to - * @param batchSize How many gets to execute in a single batch - * @param f The function that will turn the RDD values - * in HBase Get objects - * @param convertResult The function that will convert a HBase - * Result object into a value that will go - * into the resulting DStream - * @tparam R The type of Object that will be coming - * out of the resulting DStream - * @return A resulting DStream with type R objects - */ - def hbaseBulkGet[R: ClassTag](hc: HBaseContext, - tableName: TableName, - batchSize:Int, f: (T) => Get, convertResult: (Result) => R): - DStream[R] = { - hc.streamBulkGet[T, R](tableName, batchSize, dStream, f, convertResult) - } - - /** - * Implicit method that gives easy access to HBaseContext's bulk - * get. This will return a new DStream. Think about it as a DStream map - * function. In that every DStream value will get a new value out of - * HBase. That new value will populate the newly generated DStream. - * - * @param hc The hbaseContext object to identify which - * HBase cluster connection to use - * @param tableName The tableName that the put will be sent to - * @param batchSize How many gets to execute in a single batch - * @param f The function that will turn the RDD values - * in HBase Get objects - * @return A resulting DStream with type R objects - */ - def hbaseBulkGet(hc: HBaseContext, - tableName: TableName, batchSize:Int, - f: (T) => Get): DStream[(ImmutableBytesWritable, Result)] = { - hc.streamBulkGet[T, (ImmutableBytesWritable, Result)]( - tableName, batchSize, dStream, f, - result => (new ImmutableBytesWritable(result.getRow), result)) - } - - /** - * Implicit method that gives easy access to HBaseContext's bulk - * Delete. This will not return a new DStream. - * - * @param hc The hbaseContext object to identify which HBase - * cluster connection to use - * @param tableName The tableName that the deletes will be sent to - * @param f The function that will convert the DStream value into - * a HBase Delete Object - * @param batchSize The number of Deletes to be sent in a single batch - */ - def hbaseBulkDelete(hc: HBaseContext, - tableName: TableName, - f:(T) => Delete, batchSize:Int): Unit = { - hc.streamBulkDelete(dStream, tableName, f, batchSize) - } - - /** - * Implicit method that gives easy access to HBaseContext's - * foreachPartition method. This will ack very much like a normal DStream - * foreach method but for the fact that you will now have a HBase connection - * while iterating through the values. - * - * @param hc The hbaseContext object to identify which HBase - * cluster connection to use - * @param f This function will get an iterator for a Partition of an - * DStream along with a connection object to HBase - */ - def hbaseForeachPartition(hc: HBaseContext, - f: (Iterator[T], Connection) => Unit): Unit = { - hc.streamForeachPartition(dStream, f) - } - - /** - * Implicit method that gives easy access to HBaseContext's - * mapPartitions method. This will ask very much like a normal DStream - * map partitions method but for the fact that you will now have a - * HBase connection while iterating through the values - * - * @param hc The hbaseContext object to identify which HBase - * cluster connection to use - * @param f This function will get an iterator for a Partition of an - * DStream along with a connection object to HBase - * @tparam R This is the type of objects that will go into the resulting - * DStream - * @return A resulting DStream of type R - */ - def hbaseMapPartitions[R: ClassTag](hc: HBaseContext, - f: (Iterator[T], Connection) => Iterator[R]): - DStream[R] = { - hc.streamMapPartitions(dStream, f) - } - } -}
