http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala new file mode 100644 index 0000000..ee4c338 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/HFileMRGenerator.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.loader + +import java.util.UUID + +import com.typesafe.config.Config +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hbase.{HConstants, KeyValue} +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.io.compress.Compression +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding +import org.apache.hadoop.hbase.mapreduce.{GraphHFileOutputFormat, HFileOutputFormat2} +import org.apache.hadoop.hbase.regionserver.BloomType +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, SequenceFileInputFormat} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, SequenceFileOutputFormat} +import org.apache.hadoop.mapreduce.{Job, Mapper} +import org.apache.s2graph.core.GraphExceptions.LabelNotExistException +import org.apache.s2graph.core.mysqls.Label +import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD + +object HFileMRGenerator extends RawFileGenerator { + val DefaultBlockSize = 32768 + val DefaultConfig = Map( + "yarn.app.mapreduce.am.resource.mb" -> 4096, + "mapred.child.java.opts" -> "=-Xmx8g", + "mapreduce.job.running.map.limit" -> 200, + "mapreduce.job.running.reduce.limit" -> 200, + "mapreduce.reduce.memory.mb" -> 8192, + "mapreduce.reduce.java.opts" -> "-Xmx6144m", + "mapreduce.map.memory.mb" -> "4096", + "mapreduce.map.java.opts" -> "-Xmx3072m", + "mapreduce.task.io.sort.factor" -> 10 + ) + + class HFileMapper extends Mapper[ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue] { + override def map(key: ImmutableBytesWritable, value: ImmutableBytesWritable, + context: Mapper[ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue]#Context): Unit = { + val keyValue = new KeyValue(value.get(), 0, value.get().length) + context.write(new ImmutableBytesWritable(keyValue.getRow), keyValue) + } + } + + def setMROptions(conf: Configuration) = { + DefaultConfig.foreach { case (k, v) => + conf.set(k, conf.get(k, v.toString)) + } + } + + def getStartKeys(numRegions: Int): Seq[ImmutableBytesWritable] = { + val startKey = AsynchbaseStorageManagement.getStartKey(numRegions) + val endKey = AsynchbaseStorageManagement.getEndKey(numRegions) + if (numRegions < 3) { + throw new IllegalArgumentException("Must create at least three regions") + } + else if (Bytes.compareTo(startKey, endKey) >= 0) { + throw new IllegalArgumentException("Start key must be smaller than end key") + } + val empty = new Array[Byte](0) + val results = if (numRegions == 3) { + Seq(empty, startKey, endKey) + } else { + val splitKeys: Array[Array[Byte]] = Bytes.split(startKey, endKey, numRegions - 3) + if (splitKeys == null || splitKeys.length != numRegions - 1) { + throw new IllegalArgumentException("Unable to split key range into enough regions") + } + Seq(empty) ++ splitKeys.toSeq + } + results.map(new ImmutableBytesWritable(_)) + } + + def sortKeyValues(hbaseConf: Configuration, + tableName: String, + input: String, + output: String, + partitionSize: Int, + compressionAlgorithm: String) = { + val job = Job.getInstance(hbaseConf, s"sort-MR $tableName") + job.setJarByClass(getClass) + job.setInputFormatClass(classOf[SequenceFileInputFormat[ImmutableBytesWritable, ImmutableBytesWritable]]) + job.setMapperClass(classOf[HFileMapper]) + job.setOutputKeyClass(classOf[ImmutableBytesWritable]) + job.setOutputValueClass(classOf[KeyValue]) + job.setOutputFormatClass(classOf[HFileOutputFormat2]) + job.getConfiguration.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, s"/tmp/${tableName}_staging") + FileInputFormat.addInputPaths(job, input) + FileOutputFormat.setOutputPath(job, new Path(output)) + + + import scala.collection.JavaConversions._ + + GraphHFileOutputFormat.configureIncrementalLoad(job, + getStartKeys(partitionSize), + Seq("e", "v"), + Compression.getCompressionAlgorithmByName(compressionAlgorithm.toLowerCase), + BloomType.ROW, + DefaultBlockSize, + DataBlockEncoding.FAST_DIFF + ) + job + } + + def transfer(sc: SparkContext, + s2Config: Config, + input: RDD[String], + graphFileOptions: GraphFileOptions): RDD[KeyValue] = { + HFileGenerator.transfer(sc, s2Config, input, graphFileOptions) + } + + override def generate(sc: SparkContext, + config: Config, + rdd: RDD[String], + options: GraphFileOptions) = { + val merged = transfer(sc, config, rdd, options) + val tmpOutput = options.tempDir + + merged.map(x => (new ImmutableBytesWritable(x.getKey), new ImmutableBytesWritable(x.getBuffer))) + .saveAsNewAPIHadoopFile(tmpOutput, + classOf[ImmutableBytesWritable], + classOf[ImmutableBytesWritable], + classOf[SequenceFileOutputFormat[ImmutableBytesWritable, ImmutableBytesWritable]]) + + sc.killExecutors((1 to sc.defaultParallelism).map(_.toString)) + + val conf = sc.hadoopConfiguration + + conf.setClassLoader(Thread.currentThread().getContextClassLoader()) + setMROptions(conf) + + val job = sortKeyValues(conf, options.tableName, tmpOutput, options.output, + options.numRegions, options.compressionAlgorithm) + + val success = job.waitForCompletion(true) + FileSystem.get(conf).delete(new Path(tmpOutput), true) + if (!success) { + throw new RuntimeException("mapreduce failed") + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/RawFileGenerator.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/RawFileGenerator.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/RawFileGenerator.scala new file mode 100644 index 0000000..1613f20 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/loader/RawFileGenerator.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs.loader + +import com.typesafe.config.Config +import org.apache.s2graph.core.GraphUtil +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD + +case class DegreeKey(vertexIdStr: String, labelName: String, direction: String) + +trait RawFileGenerator { + def generate(sc: SparkContext, + config:Config, + rdd: RDD[String], + _options:GraphFileOptions) + + def buildDegrees(msgs: RDD[String], labelMapping: Map[String, String], edgeAutoCreate: Boolean) = { + for { + msg <- msgs + tokens = GraphUtil.split(msg) + if tokens(2) == "e" || tokens(2) == "edge" + tempDirection = if (tokens.length == 7) "out" else tokens(7) + direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection + reverseDirection = if (direction == "out") "in" else "out" + convertedLabelName = labelMapping.get(tokens(5)).getOrElse(tokens(5)) + (vertexIdStr, vertexIdStrReversed) = (tokens(3), tokens(4)) + degreeKey = DegreeKey(vertexIdStr, convertedLabelName, direction) + degreeKeyReversed = DegreeKey(vertexIdStrReversed, convertedLabelName, reverseDirection) + extra = if (edgeAutoCreate) List(degreeKeyReversed -> 1L) else Nil + output <- List(degreeKey -> 1L) ++ extra + } yield output + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/BulkLoadPartitioner.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/BulkLoadPartitioner.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/BulkLoadPartitioner.scala new file mode 100644 index 0000000..4ecce80 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/BulkLoadPartitioner.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.s2graph.s2jobs.spark + +import java.util +import java.util.Comparator + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.spark.Partitioner + +/** + * A Partitioner implementation that will separate records to different + * HBase Regions based on region splits + * + * @param startKeys The start keys for the given table + */ +class BulkLoadPartitioner(startKeys:Array[Array[Byte]]) + extends Partitioner { + + override def numPartitions: Int = startKeys.length + + override def getPartition(key: Any): Int = { + + val rowKey:Array[Byte] = + key match { + case qualifier: KeyFamilyQualifier => + qualifier.rowKey + case _ => + key.asInstanceOf[Array[Byte]] + } + + val comparator: Comparator[Array[Byte]] = new Comparator[Array[Byte]] { + override def compare(o1: Array[Byte], o2: Array[Byte]): Int = { + Bytes.compareTo(o1, o2) + } + } + val partition = util.Arrays.binarySearch(startKeys, rowKey, comparator) + if (partition < 0) partition * -1 + -2 + else partition + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/FamilyHFileWriteOptions.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/FamilyHFileWriteOptions.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/FamilyHFileWriteOptions.scala new file mode 100644 index 0000000..ea69c53 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/FamilyHFileWriteOptions.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.s2graph.s2jobs.spark + +import java.io.Serializable + +/** + * This object will hold optional data for how a given column family's + * writer will work + * + * @param compression String to define the Compression to be used in the HFile + * @param bloomType String to define the bloom type to be used in the HFile + * @param blockSize The block size to be used in the HFile + * @param dataBlockEncoding String to define the data block encoding to be used + * in the HFile + */ +class FamilyHFileWriteOptions( val compression:String, + val bloomType: String, + val blockSize: Int, + val dataBlockEncoding: String) extends Serializable http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseContext.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseContext.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseContext.scala new file mode 100644 index 0000000..d6f6708 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseContext.scala @@ -0,0 +1,850 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.s2graph.s2jobs.spark + +import java.io._ +import java.net.InetSocketAddress +import java.util + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hbase._ +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.hbase.fs.HFileSystem +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.io.compress.Compression +import org.apache.hadoop.hbase.io.compress.Compression.Algorithm +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding +import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFileContextBuilder} +import org.apache.hadoop.hbase.mapreduce.{IdentityTableMapper, TableInputFormat, TableMapReduceUtil} +import org.apache.hadoop.hbase.regionserver.{BloomType, HStore, StoreFile} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod +import org.apache.s2graph.s2jobs.spark.HBaseRDDFunctions._ +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.{SerializableWritable, SparkContext} + +import scala.collection.mutable +import scala.reflect.ClassTag + +/** + * HBaseContext is a façade for HBase operations + * like bulk put, get, increment, delete, and scan + * + * HBaseContext will take the responsibilities + * of disseminating the configuration information + * to the working and managing the life cycle of HConnections. + */ +class HBaseContext(@transient private val sc: SparkContext, + @transient private val config: Configuration, + val tmpHdfsConfgFile: String = null) + extends Serializable { + + @transient var credentials = UserGroupInformation.getCurrentUser().getCredentials + @transient var tmpHdfsConfiguration:Configuration = config + @transient var appliedCredentials = false + @transient val job = Job.getInstance(config) + TableMapReduceUtil.initCredentials(job) + val broadcastedConf = sc.broadcast(new SerializableWritable(config)) + val credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials)) + + LatestHBaseContextCache.latest = this + + if (tmpHdfsConfgFile != null && config != null) { + val fs = FileSystem.newInstance(config) + val tmpPath = new Path(tmpHdfsConfgFile) + if (!fs.exists(tmpPath)) { + val outputStream = fs.create(tmpPath) + config.write(outputStream) + outputStream.close() + } else { +// logWarning("tmpHdfsConfigDir " + tmpHdfsConfgFile + " exist!!") + } + } + + /** + * A simple enrichment of the traditional Spark RDD foreachPartition. + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param rdd Original RDD with data to iterate over + * @param f Function to be given a iterator to iterate through + * the RDD values and a HConnection object to interact + * with HBase + */ + def foreachPartition[T](rdd: RDD[T], + f: (Iterator[T], Connection) => Unit):Unit = { + rdd.foreachPartition( + it => hbaseForeachPartition(broadcastedConf, it, f)) + } + + /** + * A simple enrichment of the traditional Spark Streaming dStream foreach + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param dstream Original DStream with data to iterate over + * @param f Function to be given a iterator to iterate through + * the DStream values and a HConnection object to + * interact with HBase + */ + def foreachPartition[T](dstream: DStream[T], + f: (Iterator[T], Connection) => Unit):Unit = { + dstream.foreachRDD((rdd, time) => { + foreachPartition(rdd, f) + }) + } + + /** + * A simple enrichment of the traditional Spark RDD mapPartition. + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param rdd Original RDD with data to iterate over + * @param mp Function to be given a iterator to iterate through + * the RDD values and a HConnection object to interact + * with HBase + * @return Returns a new RDD generated by the user definition + * function just like normal mapPartition + */ + def mapPartitions[T, R: ClassTag](rdd: RDD[T], + mp: (Iterator[T], Connection) => Iterator[R]): RDD[R] = { + + rdd.mapPartitions[R](it => hbaseMapPartition[T, R](broadcastedConf, + it, + mp)) + + } + + /** + * A simple enrichment of the traditional Spark Streaming DStream + * foreachPartition. + * + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * Note: Make sure to partition correctly to avoid memory issue when + * getting data from HBase + * + * @param dstream Original DStream with data to iterate over + * @param f Function to be given a iterator to iterate through + * the DStream values and a HConnection object to + * interact with HBase + * @return Returns a new DStream generated by the user + * definition function just like normal mapPartition + */ + def streamForeachPartition[T](dstream: DStream[T], + f: (Iterator[T], Connection) => Unit): Unit = { + + dstream.foreachRDD(rdd => this.foreachPartition(rdd, f)) + } + + /** + * A simple enrichment of the traditional Spark Streaming DStream + * mapPartition. + * + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * Note: Make sure to partition correctly to avoid memory issue when + * getting data from HBase + * + * @param dstream Original DStream with data to iterate over + * @param f Function to be given a iterator to iterate through + * the DStream values and a HConnection object to + * interact with HBase + * @return Returns a new DStream generated by the user + * definition function just like normal mapPartition + */ + def streamMapPartitions[T, U: ClassTag](dstream: DStream[T], + f: (Iterator[T], Connection) => Iterator[U]): + DStream[U] = { + dstream.mapPartitions(it => hbaseMapPartition[T, U]( + broadcastedConf, + it, + f)) + } + + /** + * A simple abstraction over the HBaseContext.foreachPartition method. + * + * It allow addition support for a user to take RDD + * and generate puts and send them to HBase. + * The complexity of managing the HConnection is + * removed from the developer + * + * @param rdd Original RDD with data to iterate over + * @param tableName The name of the table to put into + * @param f Function to convert a value in the RDD to a HBase Put + */ + def bulkPut[T](rdd: RDD[T], tableName: TableName, f: (T) => Put) { + + val tName = tableName.getName + rdd.foreachPartition( + it => hbaseForeachPartition[T]( + broadcastedConf, + it, + (iterator, connection) => { + val m = connection.getBufferedMutator(TableName.valueOf(tName)) + iterator.foreach(T => m.mutate(f(T))) + m.flush() + m.close() + })) + } + + def applyCreds[T] (configBroadcast: Broadcast[SerializableWritable[Configuration]]){ + credentials = UserGroupInformation.getCurrentUser().getCredentials + +// logDebug("appliedCredentials:" + appliedCredentials + ",credentials:" + credentials) + + if (!appliedCredentials && credentials != null) { + appliedCredentials = true + + @transient val ugi = UserGroupInformation.getCurrentUser + ugi.addCredentials(credentials) + // specify that this is a proxy user + ugi.setAuthenticationMethod(AuthenticationMethod.PROXY) + + ugi.addCredentials(credentialsConf.value.value) + } + } + + /** + * A simple abstraction over the HBaseContext.streamMapPartition method. + * + * It allow addition support for a user to take a DStream and + * generate puts and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param dstream Original DStream with data to iterate over + * @param tableName The name of the table to put into + * @param f Function to convert a value in + * the DStream to a HBase Put + */ + def streamBulkPut[T](dstream: DStream[T], + tableName: TableName, + f: (T) => Put) = { + val tName = tableName.getName + dstream.foreachRDD((rdd, time) => { + bulkPut(rdd, TableName.valueOf(tName), f) + }) + } + + /** + * A simple abstraction over the HBaseContext.foreachPartition method. + * + * It allow addition support for a user to take a RDD and generate delete + * and send them to HBase. The complexity of managing the HConnection is + * removed from the developer + * + * @param rdd Original RDD with data to iterate over + * @param tableName The name of the table to delete from + * @param f Function to convert a value in the RDD to a + * HBase Deletes + * @param batchSize The number of delete to batch before sending to HBase + */ + def bulkDelete[T](rdd: RDD[T], tableName: TableName, + f: (T) => Delete, batchSize: Integer) { + bulkMutation(rdd, tableName, f, batchSize) + } + + /** + * A simple abstraction over the HBaseContext.streamBulkMutation method. + * + * It allow addition support for a user to take a DStream and + * generate Delete and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param dstream Original DStream with data to iterate over + * @param tableName The name of the table to delete from + * @param f function to convert a value in the DStream to a + * HBase Delete + * @param batchSize The number of deletes to batch before sending to HBase + */ + def streamBulkDelete[T](dstream: DStream[T], + tableName: TableName, + f: (T) => Delete, + batchSize: Integer) = { + streamBulkMutation(dstream, tableName, f, batchSize) + } + + /** + * Under lining function to support all bulk mutations + * + * May be opened up if requested + */ + private def bulkMutation[T](rdd: RDD[T], tableName: TableName, + f: (T) => Mutation, batchSize: Integer) { + + val tName = tableName.getName + rdd.foreachPartition( + it => hbaseForeachPartition[T]( + broadcastedConf, + it, + (iterator, connection) => { + val table = connection.getTable(TableName.valueOf(tName)) + val mutationList = new java.util.ArrayList[Mutation] + iterator.foreach(T => { + mutationList.add(f(T)) + if (mutationList.size >= batchSize) { + table.batch(mutationList, null) + mutationList.clear() + } + }) + if (mutationList.size() > 0) { + table.batch(mutationList, null) + mutationList.clear() + } + table.close() + })) + } + + /** + * Under lining function to support all bulk streaming mutations + * + * May be opened up if requested + */ + private def streamBulkMutation[T](dstream: DStream[T], + tableName: TableName, + f: (T) => Mutation, + batchSize: Integer) = { + val tName = tableName.getName + dstream.foreachRDD((rdd, time) => { + bulkMutation(rdd, TableName.valueOf(tName), f, batchSize) + }) + } + + /** + * A simple abstraction over the HBaseContext.mapPartition method. + * + * It allow addition support for a user to take a RDD and generates a + * new RDD based on Gets and the results they bring back from HBase + * + * @param rdd Original RDD with data to iterate over + * @param tableName The name of the table to get from + * @param makeGet function to convert a value in the RDD to a + * HBase Get + * @param convertResult This will convert the HBase Result object to + * what ever the user wants to put in the resulting + * RDD + * return new RDD that is created by the Get to HBase + */ + def bulkGet[T, U: ClassTag](tableName: TableName, + batchSize: Integer, + rdd: RDD[T], + makeGet: (T) => Get, + convertResult: (Result) => U): RDD[U] = { + + val getMapPartition = new GetMapPartition(tableName, + batchSize, + makeGet, + convertResult) + + rdd.mapPartitions[U](it => + hbaseMapPartition[T, U]( + broadcastedConf, + it, + getMapPartition.run)) + } + + /** + * A simple abstraction over the HBaseContext.streamMap method. + * + * It allow addition support for a user to take a DStream and + * generates a new DStream based on Gets and the results + * they bring back from HBase + * + * @param tableName The name of the table to get from + * @param batchSize The number of Gets to be sent in a single batch + * @param dStream Original DStream with data to iterate over + * @param makeGet Function to convert a value in the DStream to a + * HBase Get + * @param convertResult This will convert the HBase Result object to + * what ever the user wants to put in the resulting + * DStream + * @return A new DStream that is created by the Get to HBase + */ + def streamBulkGet[T, U: ClassTag](tableName: TableName, + batchSize: Integer, + dStream: DStream[T], + makeGet: (T) => Get, + convertResult: (Result) => U): DStream[U] = { + + val getMapPartition = new GetMapPartition(tableName, + batchSize, + makeGet, + convertResult) + + dStream.mapPartitions[U](it => hbaseMapPartition[T, U]( + broadcastedConf, + it, + getMapPartition.run)) + } + + /** + * This function will use the native HBase TableInputFormat with the + * given scan object to generate a new RDD + * + * @param tableName the name of the table to scan + * @param scan the HBase scan object to use to read data from HBase + * @param f function to convert a Result object from HBase into + * what the user wants in the final generated RDD + * @return new RDD with results from scan + */ + def hbaseRDD[U: ClassTag](tableName: TableName, scan: Scan, + f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = { + + val job: Job = Job.getInstance(getConf(broadcastedConf)) + + TableMapReduceUtil.initCredentials(job) + TableMapReduceUtil.initTableMapperJob(tableName, scan, + classOf[IdentityTableMapper], null, null, job) + + sc.newAPIHadoopRDD(job.getConfiguration, + classOf[TableInputFormat], + classOf[ImmutableBytesWritable], + classOf[Result]).map(f) + } + + /** + * A overloaded version of HBaseContext hbaseRDD that defines the + * type of the resulting RDD + * + * @param tableName the name of the table to scan + * @param scans the HBase scan object to use to read data from HBase + * @return New RDD with results from scan + * + */ + def hbaseRDD(tableName: TableName, scans: Scan): + RDD[(ImmutableBytesWritable, Result)] = { + + hbaseRDD[(ImmutableBytesWritable, Result)]( + tableName, + scans, + (r: (ImmutableBytesWritable, Result)) => r) + } + + /** + * underlining wrapper all foreach functions in HBaseContext + */ + private def hbaseForeachPartition[T](configBroadcast: + Broadcast[SerializableWritable[Configuration]], + it: Iterator[T], + f: (Iterator[T], Connection) => Unit) = { + + val config = getConf(configBroadcast) + + applyCreds(configBroadcast) + // specify that this is a proxy user + val connection = ConnectionFactory.createConnection(config) + f(it, connection) + connection.close() + } + + private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]): + Configuration = { + + if (tmpHdfsConfiguration == null && tmpHdfsConfgFile != null) { + val fs = FileSystem.newInstance(SparkHadoopUtil.get.conf) + val inputStream = fs.open(new Path(tmpHdfsConfgFile)) + tmpHdfsConfiguration = new Configuration(false) + tmpHdfsConfiguration.readFields(inputStream) + inputStream.close() + } + + if (tmpHdfsConfiguration == null) { + try { + tmpHdfsConfiguration = configBroadcast.value.value + } catch { + case ex: Exception => +// logError("Unable to getConfig from broadcast", ex) + } + } + tmpHdfsConfiguration + } + + /** + * underlining wrapper all mapPartition functions in HBaseContext + * + */ + private def hbaseMapPartition[K, U]( + configBroadcast: + Broadcast[SerializableWritable[Configuration]], + it: Iterator[K], + mp: (Iterator[K], Connection) => + Iterator[U]): Iterator[U] = { + + val config = getConf(configBroadcast) + applyCreds(configBroadcast) + + val connection = ConnectionFactory.createConnection(config) + val res = mp(it, connection) + connection.close() + res + + } + + /** + * underlining wrapper all get mapPartition functions in HBaseContext + */ + private class GetMapPartition[T, U](tableName: TableName, + batchSize: Integer, + makeGet: (T) => Get, + convertResult: (Result) => U) + extends Serializable { + + val tName = tableName.getName + + def run(iterator: Iterator[T], connection: Connection): Iterator[U] = { + val table = connection.getTable(TableName.valueOf(tName)) + + val gets = new java.util.ArrayList[Get]() + var res = List[U]() + + while (iterator.hasNext) { + gets.add(makeGet(iterator.next())) + + if (gets.size() == batchSize) { + val results = table.get(gets) + res = res ++ results.map(convertResult) + gets.clear() + } + } + if (gets.size() > 0) { + val results = table.get(gets) + res = res ++ results.map(convertResult) + gets.clear() + } + table.close() + res.iterator + } + } + + /** + * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef]. + * + * This method is used to keep ClassTags out of the external Java API, as + * the Java compiler cannot produce them automatically. While this + * ClassTag-faking does please the compiler, it can cause problems at runtime + * if the Scala API relies on ClassTags for correctness. + * + * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior, + * just worse performance or security issues. + * For instance, an Array of AnyRef can hold any type T, but may lose primitive + * specialization. + */ + private[spark] + def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] + + /** + * A Spark Implementation of HBase Bulk load + * + * This will take the content from an existing RDD then sort and shuffle + * it with respect to region splits. The result of that sort and shuffle + * will be written to HFiles. + * + * After this function is executed the user will have to call + * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase + * + * Also note this version of bulk load is different from past versions in + * that it includes the qualifier as part of the sort process. The + * reason for this is to be able to support rows will very large number + * of columns. + * + * @param rdd The RDD we are bulk loading from + * @param tableName The HBase table we are loading into + * @param startKeys + * @param flatMap A flapMap function that will make every + * row in the RDD + * into N cells for the bulk load + * @param stagingDir The location on the FileSystem to bulk load into + * @param familyHFileWriteOptionsMap Options that will define how the HFile for a + * column family is written + * @param compactionExclude Compaction excluded for the HFiles + * @param maxSize Max size for the HFiles before they roll + * @tparam T The Type of values in the original RDD + */ + def bulkLoad[T](rdd:RDD[T], + tableName: TableName, + startKeys: Array[Array[Byte]], + flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])], + stagingDir:String, + familyHFileWriteOptionsMap: util.Map[Array[Byte], FamilyHFileWriteOptions] = new util.HashMap[Array[Byte], FamilyHFileWriteOptions], + compactionExclude: Boolean = false, + maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE): + Unit = { +// val conn = ConnectionFactory.createConnection(config) +// val regionLocator = conn.getRegionLocator(tableName) +// val startKeys = regionLocator.getStartKeys + val defaultCompressionStr = config.get("hfile.compression", + Compression.Algorithm.NONE.getName) + val defaultCompression = Compression.getCompressionAlgorithmByName(defaultCompressionStr) +// HFileWriterImpl +// .compressionByName(defaultCompressionStr) + val now = System.currentTimeMillis() + val tableNameByteArray = tableName.getName + + val familyHFileWriteOptionsMapInternal = + new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions] + + val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator() + + while (entrySetIt.hasNext) { + val entry = entrySetIt.next() + familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue) + } + + /* + * This will return a new HFile writer when requested + * + * @param family column family + * @param conf configuration to connect to HBase + * @param favoredNodes nodes that we would like to write too + * @param fs FileSystem object where we will be writing the HFiles to + * @return WriterLength object + */ + def getNewWriter(family: Array[Byte], conf: Configuration, + favoredNodes: Array[InetSocketAddress], + fs:FileSystem, + familydir:Path): WriterLength = { + + + var familyOptions = familyHFileWriteOptionsMapInternal.get(new ByteArrayWrapper(family)) + + if (familyOptions == null) { + familyOptions = new FamilyHFileWriteOptions(defaultCompression.toString, + BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, DataBlockEncoding.NONE.toString) + familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), familyOptions) + } + + val tempConf = new Configuration(conf) + tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f) + val contextBuilder = new HFileContextBuilder() + .withCompression(Algorithm.valueOf(familyOptions.compression)) + .withChecksumType(HStore.getChecksumType(conf)) + .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) + .withBlockSize(familyOptions.blockSize) + contextBuilder.withDataBlockEncoding(DataBlockEncoding. + valueOf(familyOptions.dataBlockEncoding)) + val hFileContext = contextBuilder.build() + + if (null == favoredNodes) { + new WriterLength(0, new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs) + .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType)) + .withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext).build()) +// .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build()) + } else { + new WriterLength(0, + new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) + .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType)) + .withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext) +// .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) + .withFavoredNodes(favoredNodes).build()) + } + } + + val regionSplitPartitioner = + new BulkLoadPartitioner(startKeys) + + //This is where all the magic happens + //Here we are going to do the following things + // 1. FlapMap every row in the RDD into key column value tuples + // 2. Then we are going to repartition sort and shuffle + // 3. Finally we are going to write out our HFiles + rdd.flatMap( r => flatMap(r)). + repartitionAndSortWithinPartitions(regionSplitPartitioner). + hbaseForeachPartition(this, (it, conn) => { + + val conf = broadcastedConf.value.value + val fs = FileSystem.get(conf) + val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength] + var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY + var rollOverRequested = false + + /* + * This will roll all writers + */ + def rollWriters(): Unit = { + writerMap.values.foreach( wl => { + if (wl.writer != null) { +// logDebug("Writer=" + wl.writer.getPath + +// (if (wl.written == 0) "" else ", wrote=" + wl.written)) + close(wl.writer) + } + }) + writerMap.clear() + rollOverRequested = false + } + + /* + * This function will close a given HFile writer + * @param w The writer to close + */ + def close(w:StoreFile.Writer): Unit = { + if (w != null) { + w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, + Bytes.toBytes(System.currentTimeMillis())) + w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, + Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow))) + w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, + Bytes.toBytes(true)) + w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, + Bytes.toBytes(compactionExclude)) + w.appendTrackedTimestampsToMetadata() + w.close() + } + } + + //Here is where we finally iterate through the data in this partition of the + //RDD that has been sorted and partitioned + it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) => + + //This will get a writer for the column family + //If there is no writer for a given column family then + //it will get created here. + val wl = writerMap.getOrElseUpdate(new ByteArrayWrapper(keyFamilyQualifier.family), { + + val familyDir = new Path(stagingDir, Bytes.toString(keyFamilyQualifier.family)) + + fs.mkdirs(familyDir) + + val loc:HRegionLocation = { + try { + val locator = + conn.getRegionLocator(TableName.valueOf(tableNameByteArray)) + locator.getRegionLocation(keyFamilyQualifier.rowKey) + } catch { + case e: Throwable => +// logWarning("there's something wrong when locating rowkey: " + +// Bytes.toString(keyFamilyQualifier.rowKey)) + null + } + } + if (null == loc) { +// if (log.isTraceEnabled) { +// logTrace("failed to get region location, so use default writer: " + +// Bytes.toString(keyFamilyQualifier.rowKey)) +// } + getNewWriter(family = keyFamilyQualifier.family, conf = conf, favoredNodes = null, + fs = fs, familydir = familyDir) + } else { +// if (log.isDebugEnabled) { +// logDebug("first rowkey: [" + Bytes.toString(keyFamilyQualifier.rowKey) + "]") +// } + val initialIsa = + new InetSocketAddress(loc.getHostname, loc.getPort) + if (initialIsa.isUnresolved) { +// if (log.isTraceEnabled) { +// logTrace("failed to resolve bind address: " + loc.getHostname + ":" +// + loc.getPort + ", so use default writer") +// } + getNewWriter(keyFamilyQualifier.family, conf, null, fs, familyDir) + } else { +// if(log.isDebugEnabled) { +// logDebug("use favored nodes writer: " + initialIsa.getHostString) +// } + getNewWriter(keyFamilyQualifier.family, conf, + Array[InetSocketAddress](initialIsa), fs, familyDir) + } + } + }) + + val keyValue =new KeyValue(keyFamilyQualifier.rowKey, + keyFamilyQualifier.family, + keyFamilyQualifier.qualifier, + now,cellValue) + + wl.writer.append(keyValue) + wl.written += keyValue.getLength + + rollOverRequested = rollOverRequested || wl.written > maxSize + + //This will only roll if we have at least one column family file that is + //bigger then maxSize and we have finished a given row key + if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) { + rollWriters() + } + + previousRow = keyFamilyQualifier.rowKey + } + //We have finished all the data so lets close up the writers + rollWriters() + }) + } + + /** + * This is a wrapper class around StoreFile.Writer. The reason for the + * wrapper is to keep the length of the file along side the writer + * + * @param written The writer to be wrapped + * @param writer The number of bytes written to the writer + */ + class WriterLength(var written:Long, val writer:StoreFile.Writer) + + /** + * This is a wrapper over a byte array so it can work as + * a key in a hashMap + * + * @param o1 The Byte Array value + */ + class ByteArrayWrapper (val o1:Array[Byte]) + extends Comparable[ByteArrayWrapper] with Serializable { + override def compareTo(o2: ByteArrayWrapper): Int = { + Bytes.compareTo(o1,o2.o1) + } + override def equals(o2: Any): Boolean = { + o2 match { + case wrapper: ByteArrayWrapper => + Bytes.equals(o1, wrapper.o1) + case _ => + false + } + } + override def hashCode():Int = { + Bytes.hashCode(o1) + } + } +} + +object LatestHBaseContextCache { + var latest:HBaseContext = null +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseDStreamFunctions.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseDStreamFunctions.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseDStreamFunctions.scala new file mode 100644 index 0000000..9faaecd --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseDStreamFunctions.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.s2graph.s2jobs.spark + +import org.apache.hadoop.hbase.TableName +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.spark.streaming.dstream.DStream + +import scala.reflect.ClassTag + +/** + * HBaseDStreamFunctions contains a set of implicit functions that can be + * applied to a Spark DStream so that we can easily interact with HBase + */ +object HBaseDStreamFunctions { + + /** + * These are implicit methods for a DStream that contains any type of + * data. + * + * @param dStream This is for dStreams of any type + * @tparam T Type T + */ + implicit class GenericHBaseDStreamFunctions[T](val dStream: DStream[T]) { + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * put. This will not return a new Stream. Think of it like a foreach + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param f The function that will turn the DStream values + * into HBase Put objects. + */ + def hbaseBulkPut(hc: HBaseContext, + tableName: TableName, + f: (T) => Put): Unit = { + hc.streamBulkPut(dStream, tableName, f) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * get. This will return a new DStream. Think about it as a DStream map + * function. In that every DStream value will get a new value out of + * HBase. That new value will populate the newly generated DStream. + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param batchSize How many gets to execute in a single batch + * @param f The function that will turn the RDD values + * in HBase Get objects + * @param convertResult The function that will convert a HBase + * Result object into a value that will go + * into the resulting DStream + * @tparam R The type of Object that will be coming + * out of the resulting DStream + * @return A resulting DStream with type R objects + */ + def hbaseBulkGet[R: ClassTag](hc: HBaseContext, + tableName: TableName, + batchSize:Int, f: (T) => Get, convertResult: (Result) => R): + DStream[R] = { + hc.streamBulkGet[T, R](tableName, batchSize, dStream, f, convertResult) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * get. This will return a new DStream. Think about it as a DStream map + * function. In that every DStream value will get a new value out of + * HBase. That new value will populate the newly generated DStream. + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param batchSize How many gets to execute in a single batch + * @param f The function that will turn the RDD values + * in HBase Get objects + * @return A resulting DStream with type R objects + */ + def hbaseBulkGet(hc: HBaseContext, + tableName: TableName, batchSize:Int, + f: (T) => Get): DStream[(ImmutableBytesWritable, Result)] = { + hc.streamBulkGet[T, (ImmutableBytesWritable, Result)]( + tableName, batchSize, dStream, f, + result => (new ImmutableBytesWritable(result.getRow), result)) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * Delete. This will not return a new DStream. + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param tableName The tableName that the deletes will be sent to + * @param f The function that will convert the DStream value into + * a HBase Delete Object + * @param batchSize The number of Deletes to be sent in a single batch + */ + def hbaseBulkDelete(hc: HBaseContext, + tableName: TableName, + f:(T) => Delete, batchSize:Int): Unit = { + hc.streamBulkDelete(dStream, tableName, f, batchSize) + } + + /** + * Implicit method that gives easy access to HBaseContext's + * foreachPartition method. This will ack very much like a normal DStream + * foreach method but for the fact that you will now have a HBase connection + * while iterating through the values. + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param f This function will get an iterator for a Partition of an + * DStream along with a connection object to HBase + */ + def hbaseForeachPartition(hc: HBaseContext, + f: (Iterator[T], Connection) => Unit): Unit = { + hc.streamForeachPartition(dStream, f) + } + + /** + * Implicit method that gives easy access to HBaseContext's + * mapPartitions method. This will ask very much like a normal DStream + * map partitions method but for the fact that you will now have a + * HBase connection while iterating through the values + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param f This function will get an iterator for a Partition of an + * DStream along with a connection object to HBase + * @tparam R This is the type of objects that will go into the resulting + * DStream + * @return A resulting DStream of type R + */ + def hbaseMapPartitions[R: ClassTag](hc: HBaseContext, + f: (Iterator[T], Connection) => Iterator[R]): + DStream[R] = { + hc.streamMapPartitions(dStream, f) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseRDDFunctions.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseRDDFunctions.scala new file mode 100644 index 0000000..328587c --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/HBaseRDDFunctions.scala @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.s2graph.s2jobs.spark + +import java.util + +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.{HConstants, TableName} +import org.apache.spark.rdd.RDD + +import scala.reflect.ClassTag + +/** + * HBaseRDDFunctions contains a set of implicit functions that can be + * applied to a Spark RDD so that we can easily interact with HBase + */ +object HBaseRDDFunctions +{ + + /** + * These are implicit methods for a RDD that contains any type of + * data. + * + * @param rdd This is for rdd of any type + * @tparam T This is any type + */ + implicit class GenericHBaseRDDFunctions[T](val rdd: RDD[T]) { + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * put. This will not return a new RDD. Think of it like a foreach + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param f The function that will turn the RDD values + * into HBase Put objects. + */ + def hbaseBulkPut(hc: HBaseContext, + tableName: TableName, + f: (T) => Put): Unit = { + hc.bulkPut(rdd, tableName, f) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * get. This will return a new RDD. Think about it as a RDD map + * function. In that every RDD value will get a new value out of + * HBase. That new value will populate the newly generated RDD. + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param batchSize How many gets to execute in a single batch + * @param f The function that will turn the RDD values + * in HBase Get objects + * @param convertResult The function that will convert a HBase + * Result object into a value that will go + * into the resulting RDD + * @tparam R The type of Object that will be coming + * out of the resulting RDD + * @return A resulting RDD with type R objects + */ + def hbaseBulkGet[R: ClassTag](hc: HBaseContext, + tableName: TableName, batchSize:Int, + f: (T) => Get, convertResult: (Result) => R): RDD[R] = { + hc.bulkGet[T, R](tableName, batchSize, rdd, f, convertResult) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * get. This will return a new RDD. Think about it as a RDD map + * function. In that every RDD value will get a new value out of + * HBase. That new value will populate the newly generated RDD. + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param batchSize How many gets to execute in a single batch + * @param f The function that will turn the RDD values + * in HBase Get objects + * @return A resulting RDD with type R objects + */ + def hbaseBulkGet(hc: HBaseContext, + tableName: TableName, batchSize:Int, + f: (T) => Get): RDD[(ImmutableBytesWritable, Result)] = { + hc.bulkGet[T, (ImmutableBytesWritable, Result)](tableName, + batchSize, rdd, f, + result => if (result != null && result.getRow != null) { + (new ImmutableBytesWritable(result.getRow), result) + } else { + null + }) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * Delete. This will not return a new RDD. + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param tableName The tableName that the deletes will be sent to + * @param f The function that will convert the RDD value into + * a HBase Delete Object + * @param batchSize The number of Deletes to be sent in a single batch + */ + def hbaseBulkDelete(hc: HBaseContext, + tableName: TableName, f:(T) => Delete, batchSize:Int): Unit = { + hc.bulkDelete(rdd, tableName, f, batchSize) + } + + /** + * Implicit method that gives easy access to HBaseContext's + * foreachPartition method. This will ack very much like a normal RDD + * foreach method but for the fact that you will now have a HBase connection + * while iterating through the values. + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param f This function will get an iterator for a Partition of an + * RDD along with a connection object to HBase + */ + def hbaseForeachPartition(hc: HBaseContext, + f: (Iterator[T], Connection) => Unit): Unit = { + hc.foreachPartition(rdd, f) + } + + /** + * Implicit method that gives easy access to HBaseContext's + * mapPartitions method. This will ask very much like a normal RDD + * map partitions method but for the fact that you will now have a + * HBase connection while iterating through the values + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param f This function will get an iterator for a Partition of an + * RDD along with a connection object to HBase + * @tparam R This is the type of objects that will go into the resulting + * RDD + * @return A resulting RDD of type R + */ + def hbaseMapPartitions[R: ClassTag](hc: HBaseContext, + f: (Iterator[T], Connection) => Iterator[R]): + RDD[R] = { + hc.mapPartitions[T,R](rdd, f) + } + + /** + * Implicit method that gives easy access to HBaseContext's + * bulkLoad method. + * + * A Spark Implementation of HBase Bulk load + * + * This will take the content from an existing RDD then sort and shuffle + * it with respect to region splits. The result of that sort and shuffle + * will be written to HFiles. + * + * After this function is executed the user will have to call + * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase + * + * Also note this version of bulk load is different from past versions in + * that it includes the qualifier as part of the sort process. The + * reason for this is to be able to support rows will very large number + * of columns. + * + * @param tableName The HBase table we are loading into + * @param flatMap A flapMap function that will make every row in the RDD + * into N cells for the bulk load + * @param stagingDir The location on the FileSystem to bulk load into + * @param familyHFileWriteOptionsMap Options that will define how the HFile for a + * column family is written + * @param compactionExclude Compaction excluded for the HFiles + * @param maxSize Max size for the HFiles before they roll + */ + def hbaseBulkLoad(hc: HBaseContext, + tableName: TableName, + startKeys: Array[Array[Byte]], + flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])], + stagingDir:String, + familyHFileWriteOptionsMap: + util.Map[Array[Byte], FamilyHFileWriteOptions] = + new util.HashMap[Array[Byte], FamilyHFileWriteOptions](), + compactionExclude: Boolean = false, + maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):Unit = { + hc.bulkLoad(rdd, tableName, startKeys, + flatMap, stagingDir, familyHFileWriteOptionsMap, + compactionExclude, maxSize) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/JavaHBaseContext.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/JavaHBaseContext.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/JavaHBaseContext.scala new file mode 100644 index 0000000..d2dc7d3 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/JavaHBaseContext.scala @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.s2graph.s2jobs.spark + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.TableName +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.spark.api.java.function.{FlatMapFunction, Function, VoidFunction} +import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} +import org.apache.spark.streaming.api.java.JavaDStream + +import scala.collection.JavaConversions._ +import scala.reflect.ClassTag + +/** + * This is the Java Wrapper over HBaseContext which is written in + * Scala. This class will be used by developers that want to + * work with Spark or Spark Streaming in Java + * + * @param jsc This is the JavaSparkContext that we will wrap + * @param config This is the config information to out HBase cluster + */ +class JavaHBaseContext(@transient private val jsc: JavaSparkContext, + @transient private val config: Configuration) extends Serializable { + val hbaseContext = new HBaseContext(jsc.sc, config) + + /** + * A simple enrichment of the traditional Spark javaRdd foreachPartition. + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param javaRdd Original javaRdd with data to iterate over + * @param f Function to be given a iterator to iterate through + * the RDD values and a HConnection object to interact + * with HBase + */ + def foreachPartition[T](javaRdd: JavaRDD[T], + f: VoidFunction[(java.util.Iterator[T], Connection)]) = { + + hbaseContext.foreachPartition(javaRdd.rdd, + (it: Iterator[T], conn: Connection) => { + f.call((it, conn)) + }) + } + + /** + * A simple enrichment of the traditional Spark Streaming dStream foreach + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param javaDstream Original DStream with data to iterate over + * @param f Function to be given a iterator to iterate through + * the JavaDStream values and a HConnection object to + * interact with HBase + */ + def foreachPartition[T](javaDstream: JavaDStream[T], + f: VoidFunction[(Iterator[T], Connection)]) = { + hbaseContext.foreachPartition(javaDstream.dstream, + (it: Iterator[T], conn: Connection) => f.call((it, conn))) + } + + /** + * A simple enrichment of the traditional Spark JavaRDD mapPartition. + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * Note: Make sure to partition correctly to avoid memory issue when + * getting data from HBase + * + * @param javaRdd Original JavaRdd with data to iterate over + * @param f Function to be given a iterator to iterate through + * the RDD values and a HConnection object to interact + * with HBase + * @return Returns a new RDD generated by the user definition + * function just like normal mapPartition + */ + def mapPartitions[T, R](javaRdd: JavaRDD[T], + f: FlatMapFunction[(java.util.Iterator[T], + Connection), R]): JavaRDD[R] = { + + def fn = (it: Iterator[T], conn: Connection) => + asScalaIterator( + f.call((asJavaIterator(it), conn)) + ) + + JavaRDD.fromRDD(hbaseContext.mapPartitions(javaRdd.rdd, + (iterator: Iterator[T], connection: Connection) => + fn(iterator, connection))(fakeClassTag[R]))(fakeClassTag[R]) + } + + /** + * A simple enrichment of the traditional Spark Streaming JavaDStream + * mapPartition. + * + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * Note: Make sure to partition correctly to avoid memory issue when + * getting data from HBase + * + * @param javaDstream Original JavaDStream with data to iterate over + * @param mp Function to be given a iterator to iterate through + * the JavaDStream values and a HConnection object to + * interact with HBase + * @return Returns a new JavaDStream generated by the user + * definition function just like normal mapPartition + */ + def streamMap[T, U](javaDstream: JavaDStream[T], + mp: Function[(Iterator[T], Connection), Iterator[U]]): + JavaDStream[U] = { + JavaDStream.fromDStream(hbaseContext.streamMapPartitions(javaDstream.dstream, + (it: Iterator[T], conn: Connection) => + mp.call((it, conn)))(fakeClassTag[U]))(fakeClassTag[U]) + } + + /** + * A simple abstraction over the HBaseContext.foreachPartition method. + * + * It allow addition support for a user to take JavaRDD + * and generate puts and send them to HBase. + * The complexity of managing the HConnection is + * removed from the developer + * + * @param javaRdd Original JavaRDD with data to iterate over + * @param tableName The name of the table to put into + * @param f Function to convert a value in the JavaRDD + * to a HBase Put + */ + def bulkPut[T](javaRdd: JavaRDD[T], + tableName: TableName, + f: Function[(T), Put]) { + + hbaseContext.bulkPut(javaRdd.rdd, tableName, (t: T) => f.call(t)) + } + + /** + * A simple abstraction over the HBaseContext.streamMapPartition method. + * + * It allow addition support for a user to take a JavaDStream and + * generate puts and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param javaDstream Original DStream with data to iterate over + * @param tableName The name of the table to put into + * @param f Function to convert a value in + * the JavaDStream to a HBase Put + */ + def streamBulkPut[T](javaDstream: JavaDStream[T], + tableName: TableName, + f: Function[T, Put]) = { + hbaseContext.streamBulkPut(javaDstream.dstream, + tableName, + (t: T) => f.call(t)) + } + + /** + * A simple abstraction over the HBaseContext.foreachPartition method. + * + * It allow addition support for a user to take a JavaRDD and + * generate delete and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param javaRdd Original JavaRDD with data to iterate over + * @param tableName The name of the table to delete from + * @param f Function to convert a value in the JavaRDD to a + * HBase Deletes + * @param batchSize The number of deletes to batch before sending to HBase + */ + def bulkDelete[T](javaRdd: JavaRDD[T], tableName: TableName, + f: Function[T, Delete], batchSize: Integer) { + hbaseContext.bulkDelete(javaRdd.rdd, tableName, (t: T) => f.call(t), batchSize) + } + + /** + * A simple abstraction over the HBaseContext.streamBulkMutation method. + * + * It allow addition support for a user to take a JavaDStream and + * generate Delete and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param javaDStream Original DStream with data to iterate over + * @param tableName The name of the table to delete from + * @param f Function to convert a value in the JavaDStream to a + * HBase Delete + * @param batchSize The number of deletes to be sent at once + */ + def streamBulkDelete[T](javaDStream: JavaDStream[T], + tableName: TableName, + f: Function[T, Delete], + batchSize: Integer) = { + hbaseContext.streamBulkDelete(javaDStream.dstream, tableName, + (t: T) => f.call(t), + batchSize) + } + + /** + * A simple abstraction over the HBaseContext.mapPartition method. + * + * It allow addition support for a user to take a JavaRDD and generates a + * new RDD based on Gets and the results they bring back from HBase + * + * @param tableName The name of the table to get from + * @param batchSize batch size of how many gets to retrieve in a single fetch + * @param javaRdd Original JavaRDD with data to iterate over + * @param makeGet Function to convert a value in the JavaRDD to a + * HBase Get + * @param convertResult This will convert the HBase Result object to + * what ever the user wants to put in the resulting + * JavaRDD + * @return New JavaRDD that is created by the Get to HBase + */ + def bulkGet[T, U](tableName: TableName, + batchSize: Integer, + javaRdd: JavaRDD[T], + makeGet: Function[T, Get], + convertResult: Function[Result, U]): JavaRDD[U] = { + + JavaRDD.fromRDD(hbaseContext.bulkGet[T, U](tableName, + batchSize, + javaRdd.rdd, + (t: T) => makeGet.call(t), + (r: Result) => { + convertResult.call(r) + })(fakeClassTag[U]))(fakeClassTag[U]) + + } + + /** + * A simple abstraction over the HBaseContext.streamMap method. + * + * It allow addition support for a user to take a DStream and + * generates a new DStream based on Gets and the results + * they bring back from HBase + * + + * @param tableName The name of the table to get from + * @param batchSize The number of gets to be batched together + * @param javaDStream Original DStream with data to iterate over + * @param makeGet Function to convert a value in the JavaDStream to a + * HBase Get + * @param convertResult This will convert the HBase Result object to + * what ever the user wants to put in the resulting + * JavaDStream + * @return New JavaDStream that is created by the Get to HBase + */ + def streamBulkGet[T, U](tableName: TableName, + batchSize: Integer, + javaDStream: JavaDStream[T], + makeGet: Function[T, Get], + convertResult: Function[Result, U]) { + JavaDStream.fromDStream(hbaseContext.streamBulkGet(tableName, + batchSize, + javaDStream.dstream, + (t: T) => makeGet.call(t), + (r: Result) => convertResult.call(r))(fakeClassTag[U]))(fakeClassTag[U]) + } + + /** + * This function will use the native HBase TableInputFormat with the + * given scan object to generate a new JavaRDD + * + * @param tableName The name of the table to scan + * @param scans The HBase scan object to use to read data from HBase + * @param f Function to convert a Result object from HBase into + * What the user wants in the final generated JavaRDD + * @return New JavaRDD with results from scan + */ + def hbaseRDD[U](tableName: TableName, + scans: Scan, + f: Function[(ImmutableBytesWritable, Result), U]): + JavaRDD[U] = { + JavaRDD.fromRDD( + hbaseContext.hbaseRDD[U](tableName, + scans, + (v: (ImmutableBytesWritable, Result)) => + f.call((v._1, v._2)))(fakeClassTag[U]))(fakeClassTag[U]) + } + + /** + * A overloaded version of HBaseContext hbaseRDD that define the + * type of the resulting JavaRDD + * + * @param tableName The name of the table to scan + * @param scans The HBase scan object to use to read data from HBase + * @return New JavaRDD with results from scan + */ + def hbaseRDD(tableName: TableName, + scans: Scan): + JavaRDD[(ImmutableBytesWritable, Result)] = { + JavaRDD.fromRDD(hbaseContext.hbaseRDD(tableName, scans)) + } + + /** + * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef]. + * + * This method is used to keep ClassTags out of the external Java API, as the Java compiler + * cannot produce them automatically. While this ClassTag-faking does please the compiler, + * it can cause problems at runtime if the Scala API relies on ClassTags for correctness. + * + * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior, + * just worse performance or security issues. + * For instance, an Array[AnyRef] can hold any type T, + * but may lose primitive + * specialization. + */ + private[spark] + def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/KeyFamilyQualifier.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/KeyFamilyQualifier.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/KeyFamilyQualifier.scala new file mode 100644 index 0000000..1e05340 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/spark/KeyFamilyQualifier.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.s2graph.s2jobs.spark + +import java.io.Serializable + +import org.apache.hadoop.hbase.util.Bytes + +/** + * This is the key to be used for sorting and shuffling. + * + * We will only partition on the rowKey but we will sort on all three + * + * @param rowKey Record RowKey + * @param family Record ColumnFamily + * @param qualifier Cell Qualifier + */ +class KeyFamilyQualifier(val rowKey:Array[Byte], val family:Array[Byte], val qualifier:Array[Byte]) + extends Comparable[KeyFamilyQualifier] with Serializable { + override def compareTo(o: KeyFamilyQualifier): Int = { + var result = Bytes.compareTo(rowKey, o.rowKey) + if (result == 0) { + result = Bytes.compareTo(family, o.family) + if (result == 0) result = Bytes.compareTo(qualifier, o.qualifier) + } + result + } + override def toString: String = { + Bytes.toString(rowKey) + ":" + Bytes.toString(family) + ":" + Bytes.toString(qualifier) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/db7f0191/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala new file mode 100644 index 0000000..6e89466 --- /dev/null +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/S2GraphHelperTest.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.s2jobs + +class S2GraphHelperTest { + +}
