http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/org/apache/s2graph/loader/spark/JavaHBaseContext.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/spark/JavaHBaseContext.scala b/loader/src/main/scala/org/apache/s2graph/loader/spark/JavaHBaseContext.scala new file mode 100644 index 0000000..edfc635 --- /dev/null +++ b/loader/src/main/scala/org/apache/s2graph/loader/spark/JavaHBaseContext.scala @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.s2graph.loader.spark + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.TableName +import org.apache.hadoop.hbase.client.{Connection, Delete, Get, Put, Result, Scan} +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} +import org.apache.spark.api.java.function.{FlatMapFunction, Function, VoidFunction} +import org.apache.spark.streaming.api.java.JavaDStream + +import scala.collection.JavaConversions._ +import scala.reflect.ClassTag + +/** + * This is the Java Wrapper over HBaseContext which is written in + * Scala. This class will be used by developers that want to + * work with Spark or Spark Streaming in Java + * + * @param jsc This is the JavaSparkContext that we will wrap + * @param config This is the config information to out HBase cluster + */ +class JavaHBaseContext(@transient jsc: JavaSparkContext, + @transient config: Configuration) extends Serializable { + val hbaseContext = new HBaseContext(jsc.sc, config) + + /** + * A simple enrichment of the traditional Spark javaRdd foreachPartition. + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param javaRdd Original javaRdd with data to iterate over + * @param f Function to be given a iterator to iterate through + * the RDD values and a HConnection object to interact + * with HBase + */ + def foreachPartition[T](javaRdd: JavaRDD[T], + f: VoidFunction[(java.util.Iterator[T], Connection)]) = { + + hbaseContext.foreachPartition(javaRdd.rdd, + (it: Iterator[T], conn: Connection) => { + f.call((it, conn)) + }) + } + + /** + * A simple enrichment of the traditional Spark Streaming dStream foreach + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param javaDstream Original DStream with data to iterate over + * @param f Function to be given a iterator to iterate through + * the JavaDStream values and a HConnection object to + * interact with HBase + */ + def foreachPartition[T](javaDstream: JavaDStream[T], + f: VoidFunction[(Iterator[T], Connection)]) = { + hbaseContext.foreachPartition(javaDstream.dstream, + (it: Iterator[T], conn: Connection) => f.call(it, conn)) + } + + /** + * A simple enrichment of the traditional Spark JavaRDD mapPartition. + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * Note: Make sure to partition correctly to avoid memory issue when + * getting data from HBase + * + * @param javaRdd Original JavaRdd with data to iterate over + * @param f Function to be given a iterator to iterate through + * the RDD values and a HConnection object to interact + * with HBase + * @return Returns a new RDD generated by the user definition + * function just like normal mapPartition + */ + def mapPartitions[T, R](javaRdd: JavaRDD[T], + f: FlatMapFunction[(java.util.Iterator[T], + Connection), R]): JavaRDD[R] = { + + def fn = (it: Iterator[T], conn: Connection) => + asScalaIterator( + f.call((asJavaIterator(it), conn)).iterator() + ) + + JavaRDD.fromRDD(hbaseContext.mapPartitions(javaRdd.rdd, + (iterator: Iterator[T], connection: Connection) => + fn(iterator, connection))(fakeClassTag[R]))(fakeClassTag[R]) + } + + /** + * A simple enrichment of the traditional Spark Streaming JavaDStream + * mapPartition. + * + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * Note: Make sure to partition correctly to avoid memory issue when + * getting data from HBase + * + * @param javaDstream Original JavaDStream with data to iterate over + * @param mp Function to be given a iterator to iterate through + * the JavaDStream values and a HConnection object to + * interact with HBase + * @return Returns a new JavaDStream generated by the user + * definition function just like normal mapPartition + */ + def streamMap[T, U](javaDstream: JavaDStream[T], + mp: Function[(Iterator[T], Connection), Iterator[U]]): + JavaDStream[U] = { + JavaDStream.fromDStream(hbaseContext.streamMapPartitions(javaDstream.dstream, + (it: Iterator[T], conn: Connection) => + mp.call(it, conn))(fakeClassTag[U]))(fakeClassTag[U]) + } + + /** + * A simple abstraction over the HBaseContext.foreachPartition method. + * + * It allow addition support for a user to take JavaRDD + * and generate puts and send them to HBase. + * The complexity of managing the HConnection is + * removed from the developer + * + * @param javaRdd Original JavaRDD with data to iterate over + * @param tableName The name of the table to put into + * @param f Function to convert a value in the JavaRDD + * to a HBase Put + */ + def bulkPut[T](javaRdd: JavaRDD[T], + tableName: TableName, + f: Function[(T), Put]) { + + hbaseContext.bulkPut(javaRdd.rdd, tableName, (t: T) => f.call(t)) + } + + /** + * A simple abstraction over the HBaseContext.streamMapPartition method. + * + * It allow addition support for a user to take a JavaDStream and + * generate puts and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param javaDstream Original DStream with data to iterate over + * @param tableName The name of the table to put into + * @param f Function to convert a value in + * the JavaDStream to a HBase Put + */ + def streamBulkPut[T](javaDstream: JavaDStream[T], + tableName: TableName, + f: Function[T, Put]) = { + hbaseContext.streamBulkPut(javaDstream.dstream, + tableName, + (t: T) => f.call(t)) + } + + /** + * A simple abstraction over the HBaseContext.foreachPartition method. + * + * It allow addition support for a user to take a JavaRDD and + * generate delete and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param javaRdd Original JavaRDD with data to iterate over + * @param tableName The name of the table to delete from + * @param f Function to convert a value in the JavaRDD to a + * HBase Deletes + * @param batchSize The number of deletes to batch before sending to HBase + */ + def bulkDelete[T](javaRdd: JavaRDD[T], tableName: TableName, + f: Function[T, Delete], batchSize: Integer) { + hbaseContext.bulkDelete(javaRdd.rdd, tableName, (t: T) => f.call(t), batchSize) + } + + /** + * A simple abstraction over the HBaseContext.streamBulkMutation method. + * + * It allow addition support for a user to take a JavaDStream and + * generate Delete and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param javaDStream Original DStream with data to iterate over + * @param tableName The name of the table to delete from + * @param f Function to convert a value in the JavaDStream to a + * HBase Delete + * @param batchSize The number of deletes to be sent at once + */ + def streamBulkDelete[T](javaDStream: JavaDStream[T], + tableName: TableName, + f: Function[T, Delete], + batchSize: Integer) = { + hbaseContext.streamBulkDelete(javaDStream.dstream, tableName, + (t: T) => f.call(t), + batchSize) + } + + /** + * A simple abstraction over the HBaseContext.mapPartition method. + * + * It allow addition support for a user to take a JavaRDD and generates a + * new RDD based on Gets and the results they bring back from HBase + * + * @param tableName The name of the table to get from + * @param batchSize batch size of how many gets to retrieve in a single fetch + * @param javaRdd Original JavaRDD with data to iterate over + * @param makeGet Function to convert a value in the JavaRDD to a + * HBase Get + * @param convertResult This will convert the HBase Result object to + * what ever the user wants to put in the resulting + * JavaRDD + * @return New JavaRDD that is created by the Get to HBase + */ + def bulkGet[T, U](tableName: TableName, + batchSize: Integer, + javaRdd: JavaRDD[T], + makeGet: Function[T, Get], + convertResult: Function[Result, U]): JavaRDD[U] = { + + JavaRDD.fromRDD(hbaseContext.bulkGet[T, U](tableName, + batchSize, + javaRdd.rdd, + (t: T) => makeGet.call(t), + (r: Result) => { + convertResult.call(r) + })(fakeClassTag[U]))(fakeClassTag[U]) + + } + + /** + * A simple abstraction over the HBaseContext.streamMap method. + * + * It allow addition support for a user to take a DStream and + * generates a new DStream based on Gets and the results + * they bring back from HBase + * + + * @param tableName The name of the table to get from + * @param batchSize The number of gets to be batched together + * @param javaDStream Original DStream with data to iterate over + * @param makeGet Function to convert a value in the JavaDStream to a + * HBase Get + * @param convertResult This will convert the HBase Result object to + * what ever the user wants to put in the resulting + * JavaDStream + * @return New JavaDStream that is created by the Get to HBase + */ + def streamBulkGet[T, U](tableName: TableName, + batchSize: Integer, + javaDStream: JavaDStream[T], + makeGet: Function[T, Get], + convertResult: Function[Result, U]) { + JavaDStream.fromDStream(hbaseContext.streamBulkGet(tableName, + batchSize, + javaDStream.dstream, + (t: T) => makeGet.call(t), + (r: Result) => convertResult.call(r))(fakeClassTag[U]))(fakeClassTag[U]) + } + + /** + * This function will use the native HBase TableInputFormat with the + * given scan object to generate a new JavaRDD + * + * @param tableName The name of the table to scan + * @param scans The HBase scan object to use to read data from HBase + * @param f Function to convert a Result object from HBase into + * What the user wants in the final generated JavaRDD + * @return New JavaRDD with results from scan + */ + def hbaseRDD[U](tableName: TableName, + scans: Scan, + f: Function[(ImmutableBytesWritable, Result), U]): + JavaRDD[U] = { + JavaRDD.fromRDD( + hbaseContext.hbaseRDD[U](tableName, + scans, + (v: (ImmutableBytesWritable, Result)) => + f.call(v._1, v._2))(fakeClassTag[U]))(fakeClassTag[U]) + } + + /** + * A overloaded version of HBaseContext hbaseRDD that define the + * type of the resulting JavaRDD + * + * @param tableName The name of the table to scan + * @param scans The HBase scan object to use to read data from HBase + * @return New JavaRDD with results from scan + */ + def hbaseRDD(tableName: TableName, + scans: Scan): + JavaRDD[(ImmutableBytesWritable, Result)] = { + JavaRDD.fromRDD(hbaseContext.hbaseRDD(tableName, scans)) + } + + /** + * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef]. + * + * This method is used to keep ClassTags out of the external Java API, as the Java compiler + * cannot produce them automatically. While this ClassTag-faking does please the compiler, + * it can cause problems at runtime if the Scala API relies on ClassTags for correctness. + * + * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior, + * just worse performance or security issues. + * For instance, an Array[AnyRef] can hold any type T, + * but may lose primitive + * specialization. + */ + private[spark] + def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] +}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/org/apache/s2graph/loader/spark/KeyFamilyQualifier.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/spark/KeyFamilyQualifier.scala b/loader/src/main/scala/org/apache/s2graph/loader/spark/KeyFamilyQualifier.scala new file mode 100644 index 0000000..d7c6277 --- /dev/null +++ b/loader/src/main/scala/org/apache/s2graph/loader/spark/KeyFamilyQualifier.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.s2graph.loader.spark + +import java.io.Serializable + +import org.apache.hadoop.hbase.util.Bytes + +/** + * This is the key to be used for sorting and shuffling. + * + * We will only partition on the rowKey but we will sort on all three + * + * @param rowKey Record RowKey + * @param family Record ColumnFamily + * @param qualifier Cell Qualifier + */ +class KeyFamilyQualifier(val rowKey:Array[Byte], val family:Array[Byte], val qualifier:Array[Byte]) + extends Comparable[KeyFamilyQualifier] with Serializable { + override def compareTo(o: KeyFamilyQualifier): Int = { + var result = Bytes.compareTo(rowKey, o.rowKey) + if (result == 0) { + result = Bytes.compareTo(family, o.family) + if (result == 0) result = Bytes.compareTo(qualifier, o.qualifier) + } + result + } + override def toString: String = { + Bytes.toString(rowKey) + ":" + Bytes.toString(family) + ":" + Bytes.toString(qualifier) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala new file mode 100644 index 0000000..7838593 --- /dev/null +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala @@ -0,0 +1,335 @@ +package org.apache.s2graph.loader.subscriber + +import com.typesafe.config.{Config, ConfigFactory} +import kafka.javaapi.producer.Producer +import kafka.producer.KeyedMessage +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.hbase.client._ +import org.apache.s2graph.core._ +import org.apache.s2graph.spark.spark.WithKafka +import org.apache.spark.{Accumulable, SparkContext} +import scala.collection.JavaConversions._ +import scala.collection.mutable.HashMap +import scala.concurrent.ExecutionContext + +object GraphConfig { + var database = "" + var zkQuorum = "" + var kafkaBrokers = "" + var cacheTTL = s"${60 * 60 * 1}" + def apply(phase: String, dbUrl: Option[String], zkAddr: Option[String], kafkaBrokerList: Option[String]): Config = { + database = dbUrl.getOrElse("jdbc:mysql://localhost:3306/graph_dev") + zkQuorum = zkAddr.getOrElse("localhost") + +// val newConf = new util.HashMap[String, Object]() +// newConf.put("hbase.zookeeper.quorum", zkQuorum) +// newConf.put("db.default.url", database) +// newConf.put("kafka.metadata.broker.list", kafkaBrokers) + val newConf = + if (kafkaBrokerList.isEmpty) Map("hbase.zookeeper.quorum" -> zkQuorum, "db.default.url" -> database, "cache.ttl.seconds" -> cacheTTL) + else Map("hbase.zookeeper.quorum" -> zkQuorum, "db.default.url" -> database, "kafka.metadata.broker.list" -> kafkaBrokers, "cache.ttl.seconds" -> cacheTTL) + + ConfigFactory.parseMap(newConf).withFallback(Graph.DefaultConfig) + } +} + +object GraphSubscriberHelper extends WithKafka { + + + type HashMapAccumulable = Accumulable[HashMap[String, Long], (String, Long)] + + + lazy val producer = new Producer[String, String](kafkaConf(GraphConfig.kafkaBrokers)) + var config: Config = _ + private val writeBufferSize = 1024 * 1024 * 8 + private val sleepPeriod = 10000 + private val maxTryNum = 10 + + var g: Graph = null + var management: Management = null + val conns = new scala.collection.mutable.HashMap[String, Connection]() + + def toOption(s: String) = { + s match { + case "" | "none" => None + case _ => Some(s) + } + } + + def apply(phase: String, dbUrl: String, zkQuorum: String, kafkaBrokerList: String): Unit = { + config = GraphConfig(phase, toOption(dbUrl), toOption(zkQuorum), toOption(kafkaBrokerList)) + + if (g == null) { + val ec = ExecutionContext.Implicits.global + g = new Graph(config)(ec) + management = new Management(g) + } + } + + def getConn(zkQuorum: String): Connection = { + conns.getOrElseUpdate(zkQuorum, { + val hbaseConf = HBaseConfiguration.create() + hbaseConf.set("hbase.zookeeper.quorum", zkQuorum) + ConnectionFactory.createConnection(hbaseConf) + }) + conns(zkQuorum) + } + // def apply(phase: String, dbUrl: Option[String], zkQuorum: Option[String], kafkaBrokerList: Option[String]): Unit = { + // Graph.apply(GraphConfig(phase, dbUrl, zkQuorum, kafkaBrokerList))(ExecutionContext.Implicits.global) + // } + def report(key: String, value: Option[String], topic: String = "report") = { + val msg = Seq(Some(key), value).flatten.mkString("\t") + val kafkaMsg = new KeyedMessage[String, String](topic, msg) + producer.send(kafkaMsg) + } + + def toGraphElements(msgs: Seq[String], labelMapping: Map[String, String] = Map.empty) + (statFunc: (String, Int) => Unit): Iterable[GraphElement] = { + (for (msg <- msgs) yield { + statFunc("total", 1) + Graph.toGraphElement(msg, labelMapping) match { + case Some(e) if e.isInstanceOf[Edge] => + statFunc("EdgeParseOk", 1) + e.asInstanceOf[Edge] + case Some(v) if v.isInstanceOf[Vertex] => + statFunc("VertexParseOk", 1) + v.asInstanceOf[Vertex] + case Some(x) => + throw new RuntimeException(s">>>>> GraphSubscriber.toGraphElements: parsing failed. ${x.serviceName}") + case None => + throw new RuntimeException(s"GraphSubscriber.toGraphElements: parsing failed. $msg") + } + + }).toList + } + +// private def storeRec(zkQuorum: String, tableName: String, puts: List[Put], elementsSize: Int, tryNum: Int) +// (statFunc: (String, Int) => Unit, statPrefix: String = "edge"): Unit = { +// if (tryNum <= 0) { +// statFunc("errorStore", elementsSize) +// throw new RuntimeException(s"retry failed after $maxTryNum") +// } +// val conn = getConn(zkQuorum) +// val mutator = conn.getBufferedMutator(TableName.valueOf(tableName)) +// // val table = conn.getTable(TableName.valueOf(tableName)) +// // table.setAutoFlush(false, false) +// +// try { +// puts.foreach { put => put.setDurability(Durability.ASYNC_WAL) } +// mutator.mutate(puts) +// // table.put(puts) +// statFunc(s"$statPrefix:storeOk", elementsSize) +// } catch { +// case e: Throwable => +// e.printStackTrace() +// Thread.sleep(sleepPeriod) +// storeRec(zkQuorum, tableName, puts, elementsSize, tryNum - 1)(statFunc) +// } finally { +// mutator.close() +// // table.close() +// } +// } +// +// def storeDegreeBulk(zkQuorum: String, tableName: String) +// (degrees: Iterable[(String, String, String, Int)], labelMapping: Map[String, String] = Map.empty) +// (mapAccOpt: Option[HashMapAccumulable]): Iterable[(String, Long)] = { +// val counts = HashMap[String, Long]() +// val statFunc = storeStat(counts)(mapAccOpt) _ +// +// for { +// (vertexId, labelName, direction, degreeVal) <- degrees +// incrementRequests <- TransferToHFile.buildDegreePutRequests(vertexId, labelName, direction, degreeVal) +// } { +// storeRec(zkQuorum, tableName, incrementRequests, degrees.size, maxTryNum)(statFunc, "degree") +// } +// counts +// } +// def storeBulk(zkQuorum: String, tableName: String) +// (msgs: Seq[String], labelMapping: Map[String, String] = Map.empty, autoCreateEdge: Boolean = false) +// (mapAccOpt: Option[HashMapAccumulable]): Iterable[(String, Long)] = { +// +// val counts = HashMap[String, Long]() +// val statFunc = storeStat(counts)(mapAccOpt) _ +// val elements = toGraphElements(msgs, labelMapping)(statFunc) +// +// val puts = elements.flatMap { element => +// element match { +// case v: Vertex if v.op == GraphUtil.operations("insert") || v.op == GraphUtil.operations("insertBulk") => +// v.buildPuts() +// case e: Edge if e.op == GraphUtil.operations("insert") || e.op == GraphUtil.operations("insertBulk") => +// EdgeWriter(e).insertBulkForLoader(autoCreateEdge) +// case _ => Nil +// } +// } toList +// +// storeRec(zkQuorum, tableName, puts, msgs.size, maxTryNum)(statFunc) +// counts +// } + + def storeStat(counts: HashMap[String, Long])(mapAccOpt: Option[HashMapAccumulable])(key: String, value: Int) = { + counts.put(key, counts.getOrElse(key, 0L) + value) + mapAccOpt match { + case None => + case Some(mapAcc) => mapAcc += (key -> value) + } + } + + def toLabelMapping(lableMapping: String): Map[String, String] = { + (for { + token <- lableMapping.split(",") + inner = token.split(":") if inner.length == 2 + } yield { + (inner.head, inner.last) + }).toMap + } + + def isValidQuorum(quorum: String) = { + quorum.split(",").size > 1 + } +} + +//object GraphSubscriber extends SparkApp with WithKafka { +// val sleepPeriod = 5000 +// val usages = +// s""" +// |/** +// |* this job read edge format(TSV) from HDFS file system then bulk load edges into s2graph. assumes that newLabelName is already created by API. +// |* params: +// |* 0. hdfsPath: where is your data in hdfs. require full path with hdfs:// predix +// |* 1. dbUrl: jdbc database connection string to specify database for meta. +// |* 2. labelMapping: oldLabel:newLabel delimited by , +// |* 3. zkQuorum: target hbase zkQuorum where this job will publish data to. +// |* 4. hTableName: target hbase physical table name where this job will publish data to. +// |* 5. batchSize: how many edges will be batched for Put request to target hbase. +// |* 6. kafkaBrokerList: using kafka as fallback queue. when something goes wrong during batch, data needs to be replay will be stored in kafka. +// |* 7. kafkaTopic: fallback queue topic. +// |* 8. edgeAutoCreate: true if need to create reversed edge automatically. +// |* +// |* after this job finished, s2graph will have data with sequence corresponding newLabelName. +// |* change this newLabelName to ogirinalName if you want to online replace of label. +// |* +// |*/ +// """.stripMargin +// +// override def run() = { +// /** +// * Main function +// */ +// println(args.toList) +//// if (args.length != 10) { +//// System.err.println(usages) +//// System.exit(1) +//// } +// val hdfsPath = args(0) +// val dbUrl = args(1) +// val labelMapping = GraphSubscriberHelper.toLabelMapping(args(2)) +// +// val zkQuorum = args(3) +// val hTableName = args(4) +// val batchSize = args(5).toInt +// val kafkaBrokerList = args(6) +// val kafkaTopic = args(7) +// val edgeAutoCreate = args(8).toBoolean +// val vertexDegreePathOpt = if (args.length >= 10) GraphSubscriberHelper.toOption(args(9)) else None +// +// val conf = sparkConf(s"$hdfsPath: GraphSubscriber") +// val sc = new SparkContext(conf) +// val mapAcc = sc.accumulable(HashMap.empty[String, Long], "counter")(HashMapParam[String, Long](_ + _)) +// +// +// if (!GraphSubscriberHelper.isValidQuorum(zkQuorum)) throw new RuntimeException(s"$zkQuorum is not valid.") +// +// /** this job expect only one hTableName. all labels in this job will be stored in same physical hbase table */ +// try { +// +// import GraphSubscriberHelper._ +// // set local driver setting. +// val phase = System.getProperty("phase") +// GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList) +// +// /** copy when oldLabel exist and newLabel done exist. otherwise ignore. */ +// +// if (labelMapping.isEmpty) { +// // pass +// } else { +// for { +// (oldLabelName, newLabelName) <- labelMapping +// } { +// Management.copyLabel(oldLabelName, newLabelName, toOption(hTableName)) +// } +// } +// +// vertexDegreePathOpt.foreach { vertexDegreePath => +// val vertexDegrees = sc.textFile(vertexDegreePath).filter(line => line.split("\t").length == 4).map { line => +// val tokens = line.split("\t") +// (tokens(0), tokens(1), tokens(2), tokens(3).toInt) +// } +// vertexDegrees.foreachPartition { partition => +// +// // init Graph +// val phase = System.getProperty("phase") +// GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList) +// +// partition.grouped(batchSize).foreach { msgs => +// try { +// val start = System.currentTimeMillis() +// val counts = GraphSubscriberHelper.storeDegreeBulk(zkQuorum, hTableName)(msgs, labelMapping)(Some(mapAcc)) +// for ((k, v) <- counts) { +// mapAcc +=(k, v) +// } +// val duration = System.currentTimeMillis() - start +// println(s"[Success]: store, $mapAcc, $duration, $zkQuorum, $hTableName") +// } catch { +// case e: Throwable => +// println(s"[Failed]: store $e") +// +// msgs.foreach { msg => +// GraphSubscriberHelper.report(msg.toString(), Some(e.getMessage()), topic = kafkaTopic) +// } +// } +// } +// } +// } +// +// +// val msgs = sc.textFile(hdfsPath) +// msgs.foreachPartition(partition => { +// // set executor setting. +// val phase = System.getProperty("phase") +// GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList) +// +// partition.grouped(batchSize).foreach { msgs => +// try { +// val start = System.currentTimeMillis() +// // val counts = +// // GraphSubscriberHelper.store(msgs, GraphSubscriberHelper.toOption(newLabelName))(Some(mapAcc)) +// val counts = +// GraphSubscriberHelper.storeBulk(zkQuorum, hTableName)(msgs, labelMapping, edgeAutoCreate)(Some(mapAcc)) +// +// for ((k, v) <- counts) { +// mapAcc +=(k, v) +// } +// val duration = System.currentTimeMillis() - start +// println(s"[Success]: store, $mapAcc, $duration, $zkQuorum, $hTableName") +// } catch { +// case e: Throwable => +// println(s"[Failed]: store $e") +// +// msgs.foreach { msg => +// GraphSubscriberHelper.report(msg, Some(e.getMessage()), topic = kafkaTopic) +// } +// } +// } +// }) +// +// logInfo(s"counter: $mapAcc") +// println(s"Stats: ${mapAcc}") +// +// } catch { +// case e: Throwable => +// println(s"job failed with exception: $e") +// throw e +// } +// +// } +//} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala new file mode 100644 index 0000000..60b43ca --- /dev/null +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala @@ -0,0 +1,194 @@ +package org.apache.s2graph.loader.subscriber + +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.io.compress.Compression.Algorithm +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding +import org.apache.hadoop.hbase.mapreduce.{TableOutputFormat} +import org.apache.hadoop.hbase._ +import org.apache.hadoop.hbase.regionserver.BloomType +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core._ +import org.apache.s2graph.core.mysqls.{Label, LabelMeta} +import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId, LabelWithDirection} +import org.apache.s2graph.loader.spark.{KeyFamilyQualifier, HBaseContext, FamilyHFileWriteOptions} +import org.apache.s2graph.spark.spark.SparkApp +import org.apache.spark.{SparkContext} +import org.apache.spark.rdd.RDD +import org.hbase.async.{PutRequest} +import play.api.libs.json.Json +import scala.collection.JavaConversions._ + + +object TransferToHFile extends SparkApp with JSONParser { + + val usages = + s""" + |create HFiles for hbase table on zkQuorum specified. + |note that hbase table is created already and pre-splitted properly. + | + |params: + |0. input: hdfs path for tsv file(bulk format). + |1. output: hdfs path for storing HFiles. + |2. zkQuorum: running hbase cluster zkQuorum. + |3. tableName: table name for this bulk upload. + |4. dbUrl: db url for parsing to graph element. + """.stripMargin + + //TODO: Process AtomicIncrementRequest too. + /** build key values */ + case class DegreeKey(vertexIdStr: String, labelName: String, direction: String) + + private def insertBulkForLoaderAsync(edge: Edge, createRelEdges: Boolean = true): List[PutRequest] = { + val relEdges = if (createRelEdges) edge.relatedEdges else List(edge) + buildPutRequests(edge.toSnapshotEdge) ++ relEdges.toList.flatMap { e => + e.edgesWithIndex.flatMap { indexEdge => buildPutRequests(indexEdge) } + } + } + + def buildDegrees(msgs: RDD[String], labelMapping: Map[String, String], edgeAutoCreate: Boolean) = { + for { + msg <- msgs + tokens = GraphUtil.split(msg) + if tokens(2) == "e" || tokens(2) == "edge" + tempDirection = if (tokens.length == 7) "out" else tokens(7) + direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection + reverseDirection = if (direction == "out") "in" else "out" + convertedLabelName = labelMapping.get(tokens(5)).getOrElse(tokens(5)) + (vertexIdStr, vertexIdStrReversed) = (tokens(3), tokens(4)) + degreeKey = DegreeKey(vertexIdStr, convertedLabelName, direction) + degreeKeyReversed = DegreeKey(vertexIdStrReversed, convertedLabelName, reverseDirection) + extra = if (edgeAutoCreate) List(degreeKeyReversed -> 1L) else Nil + output <- List(degreeKey -> 1L) ++ extra + } yield output + } + def buildPutRequests(snapshotEdge: SnapshotEdge): List[PutRequest] = { + val kvs = GraphSubscriberHelper.g.storage.snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList + kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) } + } + def buildPutRequests(indexEdge: IndexEdge): List[PutRequest] = { + val kvs = GraphSubscriberHelper.g.storage.indexEdgeSerializer(indexEdge).toKeyValues.toList + kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) } + } + def buildDegreePutRequests(vertexId: String, labelName: String, direction: String, degreeVal: Long): List[PutRequest] = { + val label = Label.findByName(labelName).getOrElse(throw new RuntimeException(s"$labelName is not found in DB.")) + val dir = GraphUtil.directions(direction) + val innerVal = jsValueToInnerVal(Json.toJson(vertexId), label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse { + throw new RuntimeException(s"$vertexId can not be converted into innerval") + } + val vertex = Vertex(SourceVertexId(label.srcColumn.id.get, innerVal)) + + val ts = System.currentTimeMillis() + val propsWithTs = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion)) + val labelWithDir = LabelWithDirection(label.id.get, dir) + val edge = Edge(vertex, vertex, labelWithDir, propsWithTs=propsWithTs) + + edge.edgesWithIndex.flatMap { indexEdge => + GraphSubscriberHelper.g.storage.indexEdgeSerializer(indexEdge).toKeyValues.map { kv => + new PutRequest(kv.table, kv.row, kv.cf, Array.empty[Byte], Bytes.toBytes(degreeVal), kv.timestamp) + } + } + } + + def toKeyValues(degreeKeyVals: Seq[(DegreeKey, Long)]): Iterator[KeyValue] = { + val kvs = for { + (key, value) <- degreeKeyVals + putRequest <- buildDegreePutRequests(key.vertexIdStr, key.labelName, key.direction, value) + } yield { + val p = putRequest + val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, p.value) + kv + } + kvs.toIterator + } + + def toKeyValues(strs: Seq[String], labelMapping: Map[String, String], autoEdgeCreate: Boolean): Iterator[KeyValue] = { + val kvs = for { + s <- strs + element <- Graph.toGraphElement(s, labelMapping).toSeq if element.isInstanceOf[Edge] + edge = element.asInstanceOf[Edge] + putRequest <- insertBulkForLoaderAsync(edge, autoEdgeCreate) + } yield { + val p = putRequest + val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, p.value) + + // println(s"[Edge]: $edge\n[Put]: $p\n[KeyValue]: ${kv.getRow.toList}, ${kv.getQualifier.toList}, ${kv.getValue.toList}, ${kv.getTimestamp}") + + kv + } + kvs.toIterator + } + + + override def run() = { + val input = args(0) + val tmpPath = args(1) + val zkQuorum = args(2) + val tableName = args(3) + val dbUrl = args(4) + val maxHFilePerResionServer = args(5).toInt + val labelMapping = if (args.length >= 7) GraphSubscriberHelper.toLabelMapping(args(6)) else Map.empty[String, String] + val autoEdgeCreate = if (args.length >= 8) args(7).toBoolean else false + val buildDegree = if (args.length >= 9) args(8).toBoolean else true + val compressionAlgorithm = if (args.length >= 10) args(9) else "lz4" + val conf = sparkConf(s"$input: TransferToHFile") + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set("spark.kryoserializer.buffer.mb", "24") + + val sc = new SparkContext(conf) + + GraphSubscriberHelper.management.createTable(zkQuorum, tableName, List("e", "v"), maxHFilePerResionServer, None, compressionAlgorithm) + + /** set up hbase init */ + val hbaseConf = HBaseConfiguration.create() + hbaseConf.set("hbase.zookeeper.quorum", zkQuorum) + hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName) + hbaseConf.set("hadoop.tmp.dir", s"/tmp/$tableName") + + + val rdd = sc.textFile(input) + + + val kvs = rdd.mapPartitions { iter => + val phase = System.getProperty("phase") + GraphSubscriberHelper.apply(phase, dbUrl, "none", "none") + toKeyValues(iter.toSeq, labelMapping, autoEdgeCreate) + } + // + // val newRDD = if (!buildDegree) new HFileRDD(kvs) + // else { + // val degreeKVs = buildDegrees(rdd, labelMapping, autoEdgeCreate).reduceByKey { (agg, current) => + // agg + current + // }.mapPartitions { iter => + // val phase = System.getProperty("phase") + // GraphSubscriberHelper.apply(phase, dbUrl, "none", "none") + // toKeyValues(iter.toSeq) + // } + // new HFileRDD(kvs ++ degreeKVs) + // } + // + // newRDD.toHFile(hbaseConf, zkQuorum, tableName, maxHFilePerResionServer, tmpPath) + val merged = if (!buildDegree) kvs + else { + kvs ++ buildDegrees(rdd, labelMapping, autoEdgeCreate).reduceByKey { (agg, current) => + agg + current + }.mapPartitions { iter => + val phase = System.getProperty("phase") + GraphSubscriberHelper.apply(phase, dbUrl, "none", "none") + toKeyValues(iter.toSeq) + } + } + + val hbaseSc = new HBaseContext(sc, hbaseConf) + def flatMap(kv: KeyValue): Iterator[(KeyFamilyQualifier, Array[Byte])] = { + val k = new KeyFamilyQualifier(kv.getRow(), kv.getFamily(), kv.getQualifier()) + val v = kv.getValue() + Seq((k -> v)).toIterator + } + val familyOptions = new FamilyHFileWriteOptions(Algorithm.LZ4.getName.toUpperCase, + BloomType.ROW.name().toUpperCase, 32768, DataBlockEncoding.FAST_DIFF.name().toUpperCase) + val familyOptionsMap = Map("e".getBytes() -> familyOptions, "v".getBytes() -> familyOptions) + + hbaseSc.bulkLoad(merged, TableName.valueOf(tableName), flatMap, tmpPath, familyOptionsMap) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala new file mode 100644 index 0000000..3b54e1c --- /dev/null +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala @@ -0,0 +1,90 @@ +package org.apache.s2graph.loader.subscriber + +import kafka.producer.KeyedMessage +import kafka.serializer.StringDecoder +import org.apache.s2graph.core.Graph +import org.apache.s2graph.spark.spark.{WithKafka, SparkApp} +import org.apache.spark.streaming.Durations._ +import org.apache.spark.streaming.kafka.HasOffsetRanges +import scala.collection.mutable.{HashMap => MutableHashMap} +import scala.language.postfixOps + +object WalLogStat extends SparkApp with WithKafka { + + override def run() = { + + validateArgument("kafkaZkQuorum", "brokerList", "topics", "intervalInSec", "dbUrl", "statTopic") + + val kafkaZkQuorum = args(0) + val brokerList = args(1) + val topics = args(2) + val intervalInSec = seconds(args(3).toLong) + val dbUrl = args(4) + val statTopic = args(5) + + + val conf = sparkConf(s"$topics: ${getClass.getSimpleName}") + val ssc = streamingContext(conf, intervalInSec) + val sc = ssc.sparkContext + + val groupId = topics.replaceAll(",", "_") + "_stat" + + val kafkaParams = Map( + "zookeeper.connect" -> kafkaZkQuorum, + "group.id" -> groupId, + "metadata.broker.list" -> brokerList, + "zookeeper.connection.timeout.ms" -> "10000", + "auto.offset.reset" -> "largest") + + val stream = getStreamHelper(kafkaParams).createStream[String, String, StringDecoder, StringDecoder](ssc, topics.split(",").toSet) + val statProducer = getProducer[String, String](brokerList) + + stream.foreachRDD { (rdd, time) => + + val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + + val ts = time.milliseconds + + val elements = rdd.mapPartitions { partition => + // set executor setting. + val phase = System.getProperty("phase") + GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList) + partition.map { case (key, msg) => + Graph.toGraphElement(msg) match { + case Some(elem) => + val serviceName = elem.serviceName + msg.split("\t", 7) match { + case Array(_, operation, log_type, _, _, label, _*) => + Seq(serviceName, label, operation, log_type).mkString("\t") + case _ => + Seq("no_service_name", "no_label", "no_operation", "parsing_error").mkString("\t") + } + case None => + Seq("no_service_name", "no_label", "no_operation", "no_element_error").mkString("\t") + } + } + } + + val countByKey = elements.map(_ -> 1L).reduceByKey(_ + _).collect() + val totalCount = countByKey.map(_._2).sum + val keyedMessage = countByKey.map { case (key, value) => + new KeyedMessage[String, String](statTopic, s"$ts\t$key\t$value\t$totalCount") + } + + statProducer.send(keyedMessage: _*) + + elements.mapPartitionsWithIndex { (i, part) => + // commit offset range + val osr = offsets(i) + getStreamHelper(kafkaParams).commitConsumerOffset(osr) + Iterator.empty + }.foreach { + (_: Nothing) => () + } + + } + + ssc.start() + ssc.awaitTermination() + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala new file mode 100644 index 0000000..f51a148 --- /dev/null +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala @@ -0,0 +1,148 @@ +package org.apache.s2graph.loader.subscriber + +import java.text.SimpleDateFormat +import java.util.Date +import kafka.serializer.StringDecoder +import org.apache.s2graph.core.Graph +import org.apache.s2graph.spark.spark.{WithKafka, SparkApp, HashMapParam} +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.streaming.Durations._ +import org.apache.spark.streaming.kafka.HasOffsetRanges + +import scala.collection.mutable.{HashMap => MutableHashMap} +import scala.language.postfixOps + +object WalLogToHDFS extends SparkApp with WithKafka { + + override def run() = { + + validateArgument("kafkaZkQuorum", "brokerList", "topics", "intervalInSec", "dbUrl", "outputPath", "hiveDatabase", "hiveTable", "splitListPath") + + val kafkaZkQuorum = args(0) + val brokerList = args(1) + val topics = args(2) + val intervalInSec = seconds(args(3).toLong) + val dbUrl = args(4) + val outputPath = args(5) + val hiveDatabase = args(6) + val hiveTable = args(7) + val splitListPath = args(8) + + val conf = sparkConf(s"$topics: WalLogToHDFS") + val ssc = streamingContext(conf, intervalInSec) + val sc = ssc.sparkContext + + val groupId = topics.replaceAll(",", "_") + "_stream" + val fallbackTopic = topics.replaceAll(",", "_") + "_stream_failed" + + val kafkaParams = Map( + "zookeeper.connect" -> kafkaZkQuorum, + "group.id" -> groupId, + "metadata.broker.list" -> brokerList, + "zookeeper.connection.timeout.ms" -> "10000", + "auto.offset.reset" -> "largest") + + val stream = getStreamHelper(kafkaParams).createStream[String, String, StringDecoder, StringDecoder](ssc, topics.split(",").toSet) + + val mapAcc = sc.accumulable(new MutableHashMap[String, Long](), "Throughput")(HashMapParam[String, Long](_ + _)) + + val hdfsBlockSize = 134217728 // 128M + val hiveContext = new HiveContext(sc) + var splits = Array[String]() + var excludeLabels = Set[String]() + var excludeServices = Set[String]() + stream.foreachRDD { (rdd, time) => + try { + val read = sc.textFile(splitListPath).collect().map(_.split("=")).flatMap { + case Array(value) => Some(("split", value)) + case Array(key, value) => Some((key, value)) + case _ => None + } + splits = read.filter(_._1 == "split").map(_._2) + excludeLabels = read.filter(_._1 == "exclude_label").map(_._2).toSet + excludeServices = read.filter(_._1 == "exclude_service").map(_._2).toSet + } catch { + case _: Throwable => // use previous information + } + + val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + + val elements = rdd.mapPartitions { partition => + // set executor setting. + val phase = System.getProperty("phase") + GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList) + + partition.flatMap { case (key, msg) => + val optMsg = Graph.toGraphElement(msg).flatMap { element => + val arr = msg.split("\t", 7) + val service = element.serviceName + val label = arr(5) + val n = arr.length + + if (excludeServices.contains(service) || excludeLabels.contains(label)) { + None + } else if(n == 6) { + Some(Seq(msg, "{}", service).mkString("\t")) + } + else if(n == 7) { + Some(Seq(msg, service).mkString("\t")) + } + else { + None + } + } + optMsg + } + } + + val ts = time.milliseconds + val dateId = new SimpleDateFormat("yyyy-MM-dd").format(new Date(ts)) + + /** make sure that `elements` are not running at the same time */ + val elementsWritten = { + elements.cache() + (Array("all") ++ splits).foreach { + case split if split == "all" => + val path = s"$outputPath/split=$split/date_id=$dateId/ts=$ts" + elements.saveAsTextFile(path) + case split => + val path = s"$outputPath/split=$split/date_id=$dateId/ts=$ts" + val strlen = split.length + val splitData = elements.filter(_.takeRight(strlen) == split).cache() + val totalSize = splitData + .mapPartitions { iterator => + val s = iterator.map(_.length.toLong).sum + Iterator.single(s) + } + .sum + .toLong + val numPartitions = math.max(1, (totalSize / hdfsBlockSize.toDouble).toInt) + splitData.coalesce(math.min(splitData.partitions.length, numPartitions)).saveAsTextFile(path) + splitData.unpersist() + } + elements.unpersist() + elements + } + + elementsWritten.mapPartitionsWithIndex { (i, part) => + // commit offset range + val osr = offsets(i) + getStreamHelper(kafkaParams).commitConsumerOffset(osr) + Iterator.empty + }.foreach { + (_: Nothing) => () + } + + (Array("all") ++ splits).foreach { split => + val path = s"$outputPath/split=$split/date_id=$dateId/ts=$ts" + hiveContext.sql(s"use $hiveDatabase") + hiveContext.sql(s"alter table $hiveTable add partition (split='$split', date_id='$dateId', ts='$ts') location '$path'") + } + } + + logInfo(s"counter: ${mapAcc.value}") + println(s"counter: ${mapAcc.value}") + ssc.start() + ssc.awaitTermination() + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/spark/BulkLoadPartitioner.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/spark/BulkLoadPartitioner.scala b/loader/src/main/scala/spark/BulkLoadPartitioner.scala deleted file mode 100644 index ed1587a..0000000 --- a/loader/src/main/scala/spark/BulkLoadPartitioner.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark - -import java.util -import java.util.Comparator - -import org.apache.hadoop.hbase.util.Bytes -import org.apache.spark.Partitioner - -/** - * A Partitioner implementation that will separate records to different - * HBase Regions based on region splits - * - * @param startKeys The start keys for the given table - */ -class BulkLoadPartitioner(startKeys:Array[Array[Byte]]) - extends Partitioner { - - override def numPartitions: Int = startKeys.length - - override def getPartition(key: Any): Int = { - - val rowKey:Array[Byte] = - key match { - case qualifier: KeyFamilyQualifier => - qualifier.rowKey - case _ => - key.asInstanceOf[Array[Byte]] - } - - val comparator: Comparator[Array[Byte]] = new Comparator[Array[Byte]] { - override def compare(o1: Array[Byte], o2: Array[Byte]): Int = { - Bytes.compareTo(o1, o2) - } - } - val partition = util.Arrays.binarySearch(startKeys, rowKey, comparator) - if (partition < 0) partition * -1 + -2 - else partition - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/spark/ColumnFamilyQualifierMapKeyWrapper.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/spark/ColumnFamilyQualifierMapKeyWrapper.scala b/loader/src/main/scala/spark/ColumnFamilyQualifierMapKeyWrapper.scala deleted file mode 100644 index ab34b76..0000000 --- a/loader/src/main/scala/spark/ColumnFamilyQualifierMapKeyWrapper.scala +++ /dev/null @@ -1,73 +0,0 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one or more -// * contributor license agreements. See the NOTICE file distributed with -// * this work for additional information regarding copyright ownership. -// * The ASF licenses this file to You under the Apache License, Version 2.0 -// * (the "License"); you may not use this file except in compliance with -// * the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -//package spark -// -//import org.apache.hadoop.hbase.util.Bytes -// -///** -// * A wrapper class that will allow both columnFamily and qualifier to -// * be the key of a hashMap. Also allow for finding the value in a hashmap -// * with out cloning the HBase value from the HBase Cell object -// * @param columnFamily ColumnFamily byte array -// * @param columnFamilyOffSet Offset of columnFamily value in the array -// * @param columnFamilyLength Length of the columnFamily value in the columnFamily array -// * @param qualifier Qualifier byte array -// * @param qualifierOffSet Offset of qualifier value in the array -// * @param qualifierLength Length of the qualifier value with in the array -// */ -//class ColumnFamilyQualifierMapKeyWrapper(val columnFamily:Array[Byte], -// val columnFamilyOffSet:Int, -// val columnFamilyLength:Int, -// val qualifier:Array[Byte], -// val qualifierOffSet:Int, -// val qualifierLength:Int) -// extends Serializable{ -// -// override def equals(other:Any): Boolean = { -// val otherWrapper = other.asInstanceOf[ColumnFamilyQualifierMapKeyWrapper] -// -// Bytes.compareTo(columnFamily, -// columnFamilyOffSet, -// columnFamilyLength, -// otherWrapper.columnFamily, -// otherWrapper.columnFamilyOffSet, -// otherWrapper.columnFamilyLength) == 0 && Bytes.compareTo(qualifier, -// qualifierOffSet, -// qualifierLength, -// otherWrapper.qualifier, -// otherWrapper.qualifierOffSet, -// otherWrapper.qualifierLength) == 0 -// } -// -// override def hashCode():Int = { -// Bytes.hashCode(columnFamily, columnFamilyOffSet, columnFamilyLength) + -// Bytes.hashCode(qualifier, qualifierOffSet, qualifierLength) -// } -// -// def cloneColumnFamily():Array[Byte] = { -// val resultArray = new Array[Byte](columnFamilyLength) -// System.arraycopy(columnFamily, columnFamilyOffSet, resultArray, 0, columnFamilyLength) -// resultArray -// } -// -// def cloneQualifier():Array[Byte] = { -// val resultArray = new Array[Byte](qualifierLength) -// System.arraycopy(qualifier, qualifierOffSet, resultArray, 0, qualifierLength) -// resultArray -// } -//}