http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/main/scala/spark/HBaseRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/spark/HBaseRDDFunctions.scala b/loader/src/main/scala/spark/HBaseRDDFunctions.scala deleted file mode 100644 index 8ff8d58..0000000 --- a/loader/src/main/scala/spark/HBaseRDDFunctions.scala +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark - -import java.util - -import org.apache.hadoop.fs.Path -import org.apache.hadoop.hbase.{HConstants, TableName} -import org.apache.hadoop.hbase.client._ -import org.apache.hadoop.hbase.io.ImmutableBytesWritable -import org.apache.spark.rdd.RDD - -import scala.collection.immutable.HashMap -import scala.reflect.ClassTag - -/** - * HBaseRDDFunctions contains a set of implicit functions that can be - * applied to a Spark RDD so that we can easily interact with HBase - */ -object HBaseRDDFunctions -{ - - /** - * These are implicit methods for a RDD that contains any type of - * data. - * - * @param rdd This is for rdd of any type - * @tparam T This is any type - */ - implicit class GenericHBaseRDDFunctions[T](val rdd: RDD[T]) { - - /** - * Implicit method that gives easy access to HBaseContext's bulk - * put. This will not return a new RDD. Think of it like a foreach - * - * @param hc The hbaseContext object to identify which - * HBase cluster connection to use - * @param tableName The tableName that the put will be sent to - * @param f The function that will turn the RDD values - * into HBase Put objects. - */ - def hbaseBulkPut(hc: HBaseContext, - tableName: TableName, - f: (T) => Put): Unit = { - hc.bulkPut(rdd, tableName, f) - } - - /** - * Implicit method that gives easy access to HBaseContext's bulk - * get. This will return a new RDD. Think about it as a RDD map - * function. In that every RDD value will get a new value out of - * HBase. That new value will populate the newly generated RDD. - * - * @param hc The hbaseContext object to identify which - * HBase cluster connection to use - * @param tableName The tableName that the put will be sent to - * @param batchSize How many gets to execute in a single batch - * @param f The function that will turn the RDD values - * in HBase Get objects - * @param convertResult The function that will convert a HBase - * Result object into a value that will go - * into the resulting RDD - * @tparam R The type of Object that will be coming - * out of the resulting RDD - * @return A resulting RDD with type R objects - */ - def hbaseBulkGet[R: ClassTag](hc: HBaseContext, - tableName: TableName, batchSize:Int, - f: (T) => Get, convertResult: (Result) => R): RDD[R] = { - hc.bulkGet[T, R](tableName, batchSize, rdd, f, convertResult) - } - - /** - * Implicit method that gives easy access to HBaseContext's bulk - * get. This will return a new RDD. Think about it as a RDD map - * function. In that every RDD value will get a new value out of - * HBase. That new value will populate the newly generated RDD. - * - * @param hc The hbaseContext object to identify which - * HBase cluster connection to use - * @param tableName The tableName that the put will be sent to - * @param batchSize How many gets to execute in a single batch - * @param f The function that will turn the RDD values - * in HBase Get objects - * @return A resulting RDD with type R objects - */ - def hbaseBulkGet(hc: HBaseContext, - tableName: TableName, batchSize:Int, - f: (T) => Get): RDD[(ImmutableBytesWritable, Result)] = { - hc.bulkGet[T, (ImmutableBytesWritable, Result)](tableName, - batchSize, rdd, f, - result => if (result != null && result.getRow != null) { - (new ImmutableBytesWritable(result.getRow), result) - } else { - null - }) - } - - /** - * Implicit method that gives easy access to HBaseContext's bulk - * Delete. This will not return a new RDD. - * - * @param hc The hbaseContext object to identify which HBase - * cluster connection to use - * @param tableName The tableName that the deletes will be sent to - * @param f The function that will convert the RDD value into - * a HBase Delete Object - * @param batchSize The number of Deletes to be sent in a single batch - */ - def hbaseBulkDelete(hc: HBaseContext, - tableName: TableName, f:(T) => Delete, batchSize:Int): Unit = { - hc.bulkDelete(rdd, tableName, f, batchSize) - } - - /** - * Implicit method that gives easy access to HBaseContext's - * foreachPartition method. This will ack very much like a normal RDD - * foreach method but for the fact that you will now have a HBase connection - * while iterating through the values. - * - * @param hc The hbaseContext object to identify which HBase - * cluster connection to use - * @param f This function will get an iterator for a Partition of an - * RDD along with a connection object to HBase - */ - def hbaseForeachPartition(hc: HBaseContext, - f: (Iterator[T], Connection) => Unit): Unit = { - hc.foreachPartition(rdd, f) - } - - /** - * Implicit method that gives easy access to HBaseContext's - * mapPartitions method. This will ask very much like a normal RDD - * map partitions method but for the fact that you will now have a - * HBase connection while iterating through the values - * - * @param hc The hbaseContext object to identify which HBase - * cluster connection to use - * @param f This function will get an iterator for a Partition of an - * RDD along with a connection object to HBase - * @tparam R This is the type of objects that will go into the resulting - * RDD - * @return A resulting RDD of type R - */ - def hbaseMapPartitions[R: ClassTag](hc: HBaseContext, - f: (Iterator[T], Connection) => Iterator[R]): - RDD[R] = { - hc.mapPartitions[T,R](rdd, f) - } - - /** - * Implicit method that gives easy access to HBaseContext's - * bulkLoad method. - * - * A Spark Implementation of HBase Bulk load - * - * This will take the content from an existing RDD then sort and shuffle - * it with respect to region splits. The result of that sort and shuffle - * will be written to HFiles. - * - * After this function is executed the user will have to call - * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase - * - * Also note this version of bulk load is different from past versions in - * that it includes the qualifier as part of the sort process. The - * reason for this is to be able to support rows will very large number - * of columns. - * - * @param tableName The HBase table we are loading into - * @param flatMap A flapMap function that will make every row in the RDD - * into N cells for the bulk load - * @param stagingDir The location on the FileSystem to bulk load into - * @param familyHFileWriteOptionsMap Options that will define how the HFile for a - * column family is written - * @param compactionExclude Compaction excluded for the HFiles - * @param maxSize Max size for the HFiles before they roll - */ - def hbaseBulkLoad(hc: HBaseContext, - tableName: TableName, - flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])], - stagingDir:String, - familyHFileWriteOptionsMap: - util.Map[Array[Byte], FamilyHFileWriteOptions] = - new util.HashMap[Array[Byte], FamilyHFileWriteOptions](), - compactionExclude: Boolean = false, - maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):Unit = { - hc.bulkLoad(rdd, tableName, - flatMap, stagingDir, familyHFileWriteOptionsMap, - compactionExclude, maxSize) - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/main/scala/spark/JavaHBaseContext.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/spark/JavaHBaseContext.scala b/loader/src/main/scala/spark/JavaHBaseContext.scala deleted file mode 100644 index 39ddf2a..0000000 --- a/loader/src/main/scala/spark/JavaHBaseContext.scala +++ /dev/null @@ -1,342 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.TableName -import org.apache.hadoop.hbase.client.{Connection, Delete, Get, Put, Result, Scan} -import org.apache.hadoop.hbase.io.ImmutableBytesWritable -import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} -import org.apache.spark.api.java.function.{FlatMapFunction, Function, VoidFunction} -import org.apache.spark.streaming.api.java.JavaDStream - -import scala.collection.JavaConversions._ -import scala.reflect.ClassTag - -/** - * This is the Java Wrapper over HBaseContext which is written in - * Scala. This class will be used by developers that want to - * work with Spark or Spark Streaming in Java - * - * @param jsc This is the JavaSparkContext that we will wrap - * @param config This is the config information to out HBase cluster - */ -class JavaHBaseContext(@transient jsc: JavaSparkContext, - @transient config: Configuration) extends Serializable { - val hbaseContext = new HBaseContext(jsc.sc, config) - - /** - * A simple enrichment of the traditional Spark javaRdd foreachPartition. - * This function differs from the original in that it offers the - * developer access to a already connected HConnection object - * - * Note: Do not close the HConnection object. All HConnection - * management is handled outside this method - * - * @param javaRdd Original javaRdd with data to iterate over - * @param f Function to be given a iterator to iterate through - * the RDD values and a HConnection object to interact - * with HBase - */ - def foreachPartition[T](javaRdd: JavaRDD[T], - f: VoidFunction[(java.util.Iterator[T], Connection)]) = { - - hbaseContext.foreachPartition(javaRdd.rdd, - (it: Iterator[T], conn: Connection) => { - f.call((it, conn)) - }) - } - - /** - * A simple enrichment of the traditional Spark Streaming dStream foreach - * This function differs from the original in that it offers the - * developer access to a already connected HConnection object - * - * Note: Do not close the HConnection object. All HConnection - * management is handled outside this method - * - * @param javaDstream Original DStream with data to iterate over - * @param f Function to be given a iterator to iterate through - * the JavaDStream values and a HConnection object to - * interact with HBase - */ - def foreachPartition[T](javaDstream: JavaDStream[T], - f: VoidFunction[(Iterator[T], Connection)]) = { - hbaseContext.foreachPartition(javaDstream.dstream, - (it: Iterator[T], conn: Connection) => f.call(it, conn)) - } - - /** - * A simple enrichment of the traditional Spark JavaRDD mapPartition. - * This function differs from the original in that it offers the - * developer access to a already connected HConnection object - * - * Note: Do not close the HConnection object. All HConnection - * management is handled outside this method - * - * Note: Make sure to partition correctly to avoid memory issue when - * getting data from HBase - * - * @param javaRdd Original JavaRdd with data to iterate over - * @param f Function to be given a iterator to iterate through - * the RDD values and a HConnection object to interact - * with HBase - * @return Returns a new RDD generated by the user definition - * function just like normal mapPartition - */ - def mapPartitions[T, R](javaRdd: JavaRDD[T], - f: FlatMapFunction[(java.util.Iterator[T], - Connection), R]): JavaRDD[R] = { - - def fn = (it: Iterator[T], conn: Connection) => - asScalaIterator( - f.call((asJavaIterator(it), conn)).iterator() - ) - - JavaRDD.fromRDD(hbaseContext.mapPartitions(javaRdd.rdd, - (iterator: Iterator[T], connection: Connection) => - fn(iterator, connection))(fakeClassTag[R]))(fakeClassTag[R]) - } - - /** - * A simple enrichment of the traditional Spark Streaming JavaDStream - * mapPartition. - * - * This function differs from the original in that it offers the - * developer access to a already connected HConnection object - * - * Note: Do not close the HConnection object. All HConnection - * management is handled outside this method - * - * Note: Make sure to partition correctly to avoid memory issue when - * getting data from HBase - * - * @param javaDstream Original JavaDStream with data to iterate over - * @param mp Function to be given a iterator to iterate through - * the JavaDStream values and a HConnection object to - * interact with HBase - * @return Returns a new JavaDStream generated by the user - * definition function just like normal mapPartition - */ - def streamMap[T, U](javaDstream: JavaDStream[T], - mp: Function[(Iterator[T], Connection), Iterator[U]]): - JavaDStream[U] = { - JavaDStream.fromDStream(hbaseContext.streamMapPartitions(javaDstream.dstream, - (it: Iterator[T], conn: Connection) => - mp.call(it, conn))(fakeClassTag[U]))(fakeClassTag[U]) - } - - /** - * A simple abstraction over the HBaseContext.foreachPartition method. - * - * It allow addition support for a user to take JavaRDD - * and generate puts and send them to HBase. - * The complexity of managing the HConnection is - * removed from the developer - * - * @param javaRdd Original JavaRDD with data to iterate over - * @param tableName The name of the table to put into - * @param f Function to convert a value in the JavaRDD - * to a HBase Put - */ - def bulkPut[T](javaRdd: JavaRDD[T], - tableName: TableName, - f: Function[(T), Put]) { - - hbaseContext.bulkPut(javaRdd.rdd, tableName, (t: T) => f.call(t)) - } - - /** - * A simple abstraction over the HBaseContext.streamMapPartition method. - * - * It allow addition support for a user to take a JavaDStream and - * generate puts and send them to HBase. - * - * The complexity of managing the HConnection is - * removed from the developer - * - * @param javaDstream Original DStream with data to iterate over - * @param tableName The name of the table to put into - * @param f Function to convert a value in - * the JavaDStream to a HBase Put - */ - def streamBulkPut[T](javaDstream: JavaDStream[T], - tableName: TableName, - f: Function[T, Put]) = { - hbaseContext.streamBulkPut(javaDstream.dstream, - tableName, - (t: T) => f.call(t)) - } - - /** - * A simple abstraction over the HBaseContext.foreachPartition method. - * - * It allow addition support for a user to take a JavaRDD and - * generate delete and send them to HBase. - * - * The complexity of managing the HConnection is - * removed from the developer - * - * @param javaRdd Original JavaRDD with data to iterate over - * @param tableName The name of the table to delete from - * @param f Function to convert a value in the JavaRDD to a - * HBase Deletes - * @param batchSize The number of deletes to batch before sending to HBase - */ - def bulkDelete[T](javaRdd: JavaRDD[T], tableName: TableName, - f: Function[T, Delete], batchSize: Integer) { - hbaseContext.bulkDelete(javaRdd.rdd, tableName, (t: T) => f.call(t), batchSize) - } - - /** - * A simple abstraction over the HBaseContext.streamBulkMutation method. - * - * It allow addition support for a user to take a JavaDStream and - * generate Delete and send them to HBase. - * - * The complexity of managing the HConnection is - * removed from the developer - * - * @param javaDStream Original DStream with data to iterate over - * @param tableName The name of the table to delete from - * @param f Function to convert a value in the JavaDStream to a - * HBase Delete - * @param batchSize The number of deletes to be sent at once - */ - def streamBulkDelete[T](javaDStream: JavaDStream[T], - tableName: TableName, - f: Function[T, Delete], - batchSize: Integer) = { - hbaseContext.streamBulkDelete(javaDStream.dstream, tableName, - (t: T) => f.call(t), - batchSize) - } - - /** - * A simple abstraction over the HBaseContext.mapPartition method. - * - * It allow addition support for a user to take a JavaRDD and generates a - * new RDD based on Gets and the results they bring back from HBase - * - * @param tableName The name of the table to get from - * @param batchSize batch size of how many gets to retrieve in a single fetch - * @param javaRdd Original JavaRDD with data to iterate over - * @param makeGet Function to convert a value in the JavaRDD to a - * HBase Get - * @param convertResult This will convert the HBase Result object to - * what ever the user wants to put in the resulting - * JavaRDD - * @return New JavaRDD that is created by the Get to HBase - */ - def bulkGet[T, U](tableName: TableName, - batchSize: Integer, - javaRdd: JavaRDD[T], - makeGet: Function[T, Get], - convertResult: Function[Result, U]): JavaRDD[U] = { - - JavaRDD.fromRDD(hbaseContext.bulkGet[T, U](tableName, - batchSize, - javaRdd.rdd, - (t: T) => makeGet.call(t), - (r: Result) => { - convertResult.call(r) - })(fakeClassTag[U]))(fakeClassTag[U]) - - } - - /** - * A simple abstraction over the HBaseContext.streamMap method. - * - * It allow addition support for a user to take a DStream and - * generates a new DStream based on Gets and the results - * they bring back from HBase - * - - * @param tableName The name of the table to get from - * @param batchSize The number of gets to be batched together - * @param javaDStream Original DStream with data to iterate over - * @param makeGet Function to convert a value in the JavaDStream to a - * HBase Get - * @param convertResult This will convert the HBase Result object to - * what ever the user wants to put in the resulting - * JavaDStream - * @return New JavaDStream that is created by the Get to HBase - */ - def streamBulkGet[T, U](tableName: TableName, - batchSize: Integer, - javaDStream: JavaDStream[T], - makeGet: Function[T, Get], - convertResult: Function[Result, U]) { - JavaDStream.fromDStream(hbaseContext.streamBulkGet(tableName, - batchSize, - javaDStream.dstream, - (t: T) => makeGet.call(t), - (r: Result) => convertResult.call(r))(fakeClassTag[U]))(fakeClassTag[U]) - } - - /** - * This function will use the native HBase TableInputFormat with the - * given scan object to generate a new JavaRDD - * - * @param tableName The name of the table to scan - * @param scans The HBase scan object to use to read data from HBase - * @param f Function to convert a Result object from HBase into - * What the user wants in the final generated JavaRDD - * @return New JavaRDD with results from scan - */ - def hbaseRDD[U](tableName: TableName, - scans: Scan, - f: Function[(ImmutableBytesWritable, Result), U]): - JavaRDD[U] = { - JavaRDD.fromRDD( - hbaseContext.hbaseRDD[U](tableName, - scans, - (v: (ImmutableBytesWritable, Result)) => - f.call(v._1, v._2))(fakeClassTag[U]))(fakeClassTag[U]) - } - - /** - * A overloaded version of HBaseContext hbaseRDD that define the - * type of the resulting JavaRDD - * - * @param tableName The name of the table to scan - * @param scans The HBase scan object to use to read data from HBase - * @return New JavaRDD with results from scan - */ - def hbaseRDD(tableName: TableName, - scans: Scan): - JavaRDD[(ImmutableBytesWritable, Result)] = { - JavaRDD.fromRDD(hbaseContext.hbaseRDD(tableName, scans)) - } - - /** - * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef]. - * - * This method is used to keep ClassTags out of the external Java API, as the Java compiler - * cannot produce them automatically. While this ClassTag-faking does please the compiler, - * it can cause problems at runtime if the Scala API relies on ClassTags for correctness. - * - * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior, - * just worse performance or security issues. - * For instance, an Array[AnyRef] can hold any type T, - * but may lose primitive - * specialization. - */ - private[spark] - def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/main/scala/spark/KeyFamilyQualifier.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/spark/KeyFamilyQualifier.scala b/loader/src/main/scala/spark/KeyFamilyQualifier.scala deleted file mode 100644 index 4a00b63..0000000 --- a/loader/src/main/scala/spark/KeyFamilyQualifier.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark - -import java.io.Serializable - -import org.apache.hadoop.hbase.util.Bytes - -/** - * This is the key to be used for sorting and shuffling. - * - * We will only partition on the rowKey but we will sort on all three - * - * @param rowKey Record RowKey - * @param family Record ColumnFamily - * @param qualifier Cell Qualifier - */ -class KeyFamilyQualifier(val rowKey:Array[Byte], val family:Array[Byte], val qualifier:Array[Byte]) - extends Comparable[KeyFamilyQualifier] with Serializable { - override def compareTo(o: KeyFamilyQualifier): Int = { - var result = Bytes.compareTo(rowKey, o.rowKey) - if (result == 0) { - result = Bytes.compareTo(family, o.family) - if (result == 0) result = Bytes.compareTo(qualifier, o.qualifier) - } - result - } - override def toString: String = { - Bytes.toString(rowKey) + ":" + Bytes.toString(family) + ":" + Bytes.toString(qualifier) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/main/scala/subscriber/GraphSubscriber.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/subscriber/GraphSubscriber.scala b/loader/src/main/scala/subscriber/GraphSubscriber.scala deleted file mode 100644 index f4f7865..0000000 --- a/loader/src/main/scala/subscriber/GraphSubscriber.scala +++ /dev/null @@ -1,339 +0,0 @@ -package subscriber - - -import java.util - -import com.kakao.s2graph.core.{Graph, _} -import com.typesafe.config.{Config, ConfigFactory} -import kafka.javaapi.producer.Producer -import kafka.producer.KeyedMessage -import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} -import org.apache.hadoop.hbase.client._ -import org.apache.spark.{Accumulable, SparkContext} -import s2.spark.{HashMapParam, SparkApp, WithKafka} - -import scala.collection.JavaConversions._ -import scala.collection.mutable.HashMap -import scala.concurrent.ExecutionContext - -object GraphConfig { - var database = "" - var zkQuorum = "" - var kafkaBrokers = "" - var cacheTTL = s"${60 * 60 * 1}" - def apply(phase: String, dbUrl: Option[String], zkAddr: Option[String], kafkaBrokerList: Option[String]): Config = { - database = dbUrl.getOrElse("jdbc:mysql://localhost:3306/graph_dev") - zkQuorum = zkAddr.getOrElse("localhost") - -// val newConf = new util.HashMap[String, Object]() -// newConf.put("hbase.zookeeper.quorum", zkQuorum) -// newConf.put("db.default.url", database) -// newConf.put("kafka.metadata.broker.list", kafkaBrokers) - val newConf = - if (kafkaBrokerList.isEmpty) Map("hbase.zookeeper.quorum" -> zkQuorum, "db.default.url" -> database, "cache.ttl.seconds" -> cacheTTL) - else Map("hbase.zookeeper.quorum" -> zkQuorum, "db.default.url" -> database, "kafka.metadata.broker.list" -> kafkaBrokers, "cache.ttl.seconds" -> cacheTTL) - - ConfigFactory.parseMap(newConf).withFallback(Graph.DefaultConfig) - } -} - -object GraphSubscriberHelper extends WithKafka { - - - type HashMapAccumulable = Accumulable[HashMap[String, Long], (String, Long)] - - - lazy val producer = new Producer[String, String](kafkaConf(GraphConfig.kafkaBrokers)) - var config: Config = _ - private val writeBufferSize = 1024 * 1024 * 8 - private val sleepPeriod = 10000 - private val maxTryNum = 10 - - var g: Graph = null - var management: Management = null - val conns = new scala.collection.mutable.HashMap[String, Connection]() - - def toOption(s: String) = { - s match { - case "" | "none" => None - case _ => Some(s) - } - } - - def apply(phase: String, dbUrl: String, zkQuorum: String, kafkaBrokerList: String): Unit = { - config = GraphConfig(phase, toOption(dbUrl), toOption(zkQuorum), toOption(kafkaBrokerList)) - - if (g == null) { - val ec = ExecutionContext.Implicits.global - g = new Graph(config)(ec) - management = new Management(g)(ec) - } - } - - def getConn(zkQuorum: String): Connection = { - conns.getOrElseUpdate(zkQuorum, { - val hbaseConf = HBaseConfiguration.create() - hbaseConf.set("hbase.zookeeper.quorum", zkQuorum) - ConnectionFactory.createConnection(hbaseConf) - }) - conns(zkQuorum) - } - // def apply(phase: String, dbUrl: Option[String], zkQuorum: Option[String], kafkaBrokerList: Option[String]): Unit = { - // Graph.apply(GraphConfig(phase, dbUrl, zkQuorum, kafkaBrokerList))(ExecutionContext.Implicits.global) - // } - def report(key: String, value: Option[String], topic: String = "report") = { - val msg = Seq(Some(key), value).flatten.mkString("\t") - val kafkaMsg = new KeyedMessage[String, String](topic, msg) - producer.send(kafkaMsg) - } - - def toGraphElements(msgs: Seq[String], labelMapping: Map[String, String] = Map.empty) - (statFunc: (String, Int) => Unit): Iterable[GraphElement] = { - (for (msg <- msgs) yield { - statFunc("total", 1) - Graph.toGraphElement(msg, labelMapping) match { - case Some(e) if e.isInstanceOf[Edge] => - statFunc("EdgeParseOk", 1) - e.asInstanceOf[Edge] - case Some(v) if v.isInstanceOf[Vertex] => - statFunc("VertexParseOk", 1) - v.asInstanceOf[Vertex] - case Some(x) => - throw new RuntimeException(s">>>>> GraphSubscriber.toGraphElements: parsing failed. ${x.serviceName}") - case None => - throw new RuntimeException(s"GraphSubscriber.toGraphElements: parsing failed. $msg") - } - - }).toList - } - -// private def storeRec(zkQuorum: String, tableName: String, puts: List[Put], elementsSize: Int, tryNum: Int) -// (statFunc: (String, Int) => Unit, statPrefix: String = "edge"): Unit = { -// if (tryNum <= 0) { -// statFunc("errorStore", elementsSize) -// throw new RuntimeException(s"retry failed after $maxTryNum") -// } -// val conn = getConn(zkQuorum) -// val mutator = conn.getBufferedMutator(TableName.valueOf(tableName)) -// // val table = conn.getTable(TableName.valueOf(tableName)) -// // table.setAutoFlush(false, false) -// -// try { -// puts.foreach { put => put.setDurability(Durability.ASYNC_WAL) } -// mutator.mutate(puts) -// // table.put(puts) -// statFunc(s"$statPrefix:storeOk", elementsSize) -// } catch { -// case e: Throwable => -// e.printStackTrace() -// Thread.sleep(sleepPeriod) -// storeRec(zkQuorum, tableName, puts, elementsSize, tryNum - 1)(statFunc) -// } finally { -// mutator.close() -// // table.close() -// } -// } -// -// def storeDegreeBulk(zkQuorum: String, tableName: String) -// (degrees: Iterable[(String, String, String, Int)], labelMapping: Map[String, String] = Map.empty) -// (mapAccOpt: Option[HashMapAccumulable]): Iterable[(String, Long)] = { -// val counts = HashMap[String, Long]() -// val statFunc = storeStat(counts)(mapAccOpt) _ -// -// for { -// (vertexId, labelName, direction, degreeVal) <- degrees -// incrementRequests <- TransferToHFile.buildDegreePutRequests(vertexId, labelName, direction, degreeVal) -// } { -// storeRec(zkQuorum, tableName, incrementRequests, degrees.size, maxTryNum)(statFunc, "degree") -// } -// counts -// } -// def storeBulk(zkQuorum: String, tableName: String) -// (msgs: Seq[String], labelMapping: Map[String, String] = Map.empty, autoCreateEdge: Boolean = false) -// (mapAccOpt: Option[HashMapAccumulable]): Iterable[(String, Long)] = { -// -// val counts = HashMap[String, Long]() -// val statFunc = storeStat(counts)(mapAccOpt) _ -// val elements = toGraphElements(msgs, labelMapping)(statFunc) -// -// val puts = elements.flatMap { element => -// element match { -// case v: Vertex if v.op == GraphUtil.operations("insert") || v.op == GraphUtil.operations("insertBulk") => -// v.buildPuts() -// case e: Edge if e.op == GraphUtil.operations("insert") || e.op == GraphUtil.operations("insertBulk") => -// EdgeWriter(e).insertBulkForLoader(autoCreateEdge) -// case _ => Nil -// } -// } toList -// -// storeRec(zkQuorum, tableName, puts, msgs.size, maxTryNum)(statFunc) -// counts -// } - - def storeStat(counts: HashMap[String, Long])(mapAccOpt: Option[HashMapAccumulable])(key: String, value: Int) = { - counts.put(key, counts.getOrElse(key, 0L) + value) - mapAccOpt match { - case None => - case Some(mapAcc) => mapAcc += (key -> value) - } - } - - def toLabelMapping(lableMapping: String): Map[String, String] = { - (for { - token <- lableMapping.split(",") - inner = token.split(":") if inner.length == 2 - } yield { - (inner.head, inner.last) - }).toMap - } - - def isValidQuorum(quorum: String) = { - quorum.split(",").size > 1 - } -} - -//object GraphSubscriber extends SparkApp with WithKafka { -// val sleepPeriod = 5000 -// val usages = -// s""" -// |/** -// |* this job read edge format(TSV) from HDFS file system then bulk load edges into s2graph. assumes that newLabelName is already created by API. -// |* params: -// |* 0. hdfsPath: where is your data in hdfs. require full path with hdfs:// predix -// |* 1. dbUrl: jdbc database connection string to specify database for meta. -// |* 2. labelMapping: oldLabel:newLabel delimited by , -// |* 3. zkQuorum: target hbase zkQuorum where this job will publish data to. -// |* 4. hTableName: target hbase physical table name where this job will publish data to. -// |* 5. batchSize: how many edges will be batched for Put request to target hbase. -// |* 6. kafkaBrokerList: using kafka as fallback queue. when something goes wrong during batch, data needs to be replay will be stored in kafka. -// |* 7. kafkaTopic: fallback queue topic. -// |* 8. edgeAutoCreate: true if need to create reversed edge automatically. -// |* -// |* after this job finished, s2graph will have data with sequence corresponding newLabelName. -// |* change this newLabelName to ogirinalName if you want to online replace of label. -// |* -// |*/ -// """.stripMargin -// -// override def run() = { -// /** -// * Main function -// */ -// println(args.toList) -//// if (args.length != 10) { -//// System.err.println(usages) -//// System.exit(1) -//// } -// val hdfsPath = args(0) -// val dbUrl = args(1) -// val labelMapping = GraphSubscriberHelper.toLabelMapping(args(2)) -// -// val zkQuorum = args(3) -// val hTableName = args(4) -// val batchSize = args(5).toInt -// val kafkaBrokerList = args(6) -// val kafkaTopic = args(7) -// val edgeAutoCreate = args(8).toBoolean -// val vertexDegreePathOpt = if (args.length >= 10) GraphSubscriberHelper.toOption(args(9)) else None -// -// val conf = sparkConf(s"$hdfsPath: GraphSubscriber") -// val sc = new SparkContext(conf) -// val mapAcc = sc.accumulable(HashMap.empty[String, Long], "counter")(HashMapParam[String, Long](_ + _)) -// -// -// if (!GraphSubscriberHelper.isValidQuorum(zkQuorum)) throw new RuntimeException(s"$zkQuorum is not valid.") -// -// /** this job expect only one hTableName. all labels in this job will be stored in same physical hbase table */ -// try { -// -// import GraphSubscriberHelper._ -// // set local driver setting. -// val phase = System.getProperty("phase") -// GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList) -// -// /** copy when oldLabel exist and newLabel done exist. otherwise ignore. */ -// -// if (labelMapping.isEmpty) { -// // pass -// } else { -// for { -// (oldLabelName, newLabelName) <- labelMapping -// } { -// Management.copyLabel(oldLabelName, newLabelName, toOption(hTableName)) -// } -// } -// -// vertexDegreePathOpt.foreach { vertexDegreePath => -// val vertexDegrees = sc.textFile(vertexDegreePath).filter(line => line.split("\t").length == 4).map { line => -// val tokens = line.split("\t") -// (tokens(0), tokens(1), tokens(2), tokens(3).toInt) -// } -// vertexDegrees.foreachPartition { partition => -// -// // init Graph -// val phase = System.getProperty("phase") -// GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList) -// -// partition.grouped(batchSize).foreach { msgs => -// try { -// val start = System.currentTimeMillis() -// val counts = GraphSubscriberHelper.storeDegreeBulk(zkQuorum, hTableName)(msgs, labelMapping)(Some(mapAcc)) -// for ((k, v) <- counts) { -// mapAcc +=(k, v) -// } -// val duration = System.currentTimeMillis() - start -// println(s"[Success]: store, $mapAcc, $duration, $zkQuorum, $hTableName") -// } catch { -// case e: Throwable => -// println(s"[Failed]: store $e") -// -// msgs.foreach { msg => -// GraphSubscriberHelper.report(msg.toString(), Some(e.getMessage()), topic = kafkaTopic) -// } -// } -// } -// } -// } -// -// -// val msgs = sc.textFile(hdfsPath) -// msgs.foreachPartition(partition => { -// // set executor setting. -// val phase = System.getProperty("phase") -// GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList) -// -// partition.grouped(batchSize).foreach { msgs => -// try { -// val start = System.currentTimeMillis() -// // val counts = -// // GraphSubscriberHelper.store(msgs, GraphSubscriberHelper.toOption(newLabelName))(Some(mapAcc)) -// val counts = -// GraphSubscriberHelper.storeBulk(zkQuorum, hTableName)(msgs, labelMapping, edgeAutoCreate)(Some(mapAcc)) -// -// for ((k, v) <- counts) { -// mapAcc +=(k, v) -// } -// val duration = System.currentTimeMillis() - start -// println(s"[Success]: store, $mapAcc, $duration, $zkQuorum, $hTableName") -// } catch { -// case e: Throwable => -// println(s"[Failed]: store $e") -// -// msgs.foreach { msg => -// GraphSubscriberHelper.report(msg, Some(e.getMessage()), topic = kafkaTopic) -// } -// } -// } -// }) -// -// logInfo(s"counter: $mapAcc") -// println(s"Stats: ${mapAcc}") -// -// } catch { -// case e: Throwable => -// println(s"job failed with exception: $e") -// throw e -// } -// -// } -//} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/main/scala/subscriber/GraphSubscriberStreaming.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/subscriber/GraphSubscriberStreaming.scala b/loader/src/main/scala/subscriber/GraphSubscriberStreaming.scala deleted file mode 100644 index 1063560..0000000 --- a/loader/src/main/scala/subscriber/GraphSubscriberStreaming.scala +++ /dev/null @@ -1,103 +0,0 @@ -package subscriber - -import org.apache.hadoop.hbase.HBaseConfiguration -import org.apache.hadoop.hbase.client.{ConnectionFactory} -import org.apache.spark.streaming.Durations._ -import s2.spark.{HashMapParam, SparkApp, WithKafka} - -import scala.collection.mutable.{HashMap => MutableHashMap} -import scala.language.postfixOps - -//object GraphSubscriberStreaming extends SparkApp with WithKafka { -// val usages = -// s""" -// |/** -// |this job consume edges/vertices from kafka topic then load them into s2graph. -// |params: -// | 1. kafkaZkQuorum: kafka zk address to consume events -// | 2. brokerList: kafka cluster`s broker list. -// | 3. topics: , delimited list of topics to consume -// | 4. intervalInSec: batch interval for this job. -// | 5. batchSize: how many edges/vertices will be grouped for bulk mutations. -// | 6. hbaseZkQuorum: s2graph zookeeper address. -// | 7. hTableName: physical hbase table name. -// | 8. labelMapping: oldLabel:newLabel delimited by , -// |*/ -// """.stripMargin -// override def run() = { -// if (args.length != 9) { -// System.err.println(usages) -// System.exit(1) -// } -// val kafkaZkQuorum = args(0) -// val brokerList = args(1) -// val topics = args(2) -// val intervalInSec = seconds(args(3).toLong) -// val dbUrl = args(4) -// val batchSize = args(5).toInt -// val hbaseZkQuorum = args(6) -// val hTableName = args(7) -// val labelMapping = GraphSubscriberHelper.toLabelMapping(args(8)) -// -// -// if (!GraphSubscriberHelper.isValidQuorum(hbaseZkQuorum)) -// throw new RuntimeException(s"$hbaseZkQuorum is not valid.") -// -// val conf = sparkConf(s"$topics: GraphSubscriberStreaming") -// val ssc = streamingContext(conf, intervalInSec) -// val sc = ssc.sparkContext -// -// val groupId = topics.replaceAll(",", "_") + "_stream" -// val fallbackTopic = topics.replaceAll(",", "_") + "_stream_failed" -// -// val kafkaParams = Map( -// "zookeeper.connect" -> kafkaZkQuorum, -// "group.id" -> groupId, -// "zookeeper.connection.timeout.ms" -> "10000", -// "metadata.broker.list" -> brokerList, -// "auto.offset.reset" -> "largest") -// -// val stream = createKafkaValueStreamMulti(ssc, kafkaParams, topics, 8, None).flatMap(s => s.split("\n")) -// -// val mapAcc = sc.accumulable(new MutableHashMap[String, Long](), "Throughput")(HashMapParam[String, Long](_ + _)) -// -// -// stream.foreachRDD(rdd => { -// -// rdd.foreachPartition(partition => { -// // set executor setting. -// val phase = System.getProperty("phase") -// GraphSubscriberHelper.apply(phase, dbUrl, hbaseZkQuorum, brokerList) -// -// partition.grouped(batchSize).foreach { msgs => -// try { -// val start = System.currentTimeMillis() -// // val counts = -// // GraphSubscriberHelper.store(msgs, GraphSubscriberHelper.toOption(newLabelName))(Some(mapAcc)) -// val counts = -// GraphSubscriberHelper.storeBulk(hbaseZkQuorum, hTableName)(msgs, labelMapping)(Some(mapAcc)) -// -// for ((k, v) <- counts) { -// mapAcc +=(k, v) -// } -// val duration = System.currentTimeMillis() - start -// println(s"[Success]: store, $mapAcc, $duration, $hbaseZkQuorum, $hTableName") -// } catch { -// case e: Throwable => -// println(s"[Failed]: store $e") -// -// msgs.foreach { msg => -// GraphSubscriberHelper.report(msg, Some(e.getMessage()), topic = fallbackTopic) -// } -// } -// } -// }) -// }) -// -// -// logInfo(s"counter: ${mapAcc.value}") -// println(s"counter: ${mapAcc.value}") -// ssc.start() -// ssc.awaitTermination() -// } -//} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/main/scala/subscriber/KafkaToHdfs.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/subscriber/KafkaToHdfs.scala b/loader/src/main/scala/subscriber/KafkaToHdfs.scala deleted file mode 100644 index 785cb69..0000000 --- a/loader/src/main/scala/subscriber/KafkaToHdfs.scala +++ /dev/null @@ -1,111 +0,0 @@ -package subscriber - -import org.apache.spark.streaming.Durations._ -import s2.spark.{HashMapParam, SparkApp, WithKafka} - -import scala.collection.mutable.{HashMap => MutableHashMap} -import scala.language.postfixOps - -/** - * Created by shon on 9/2/15. - */ -//object KafkaToHdfs extends SparkApp with WithKafka { -// val usages = -// s""" -// |/** -// |this job consume edges/vertices from kafka topic then load them into s2graph. -// |params: -// | 1. kafkaZkQuorum: kafka zk address to consume events -// | 2. brokerList: kafka cluster`s broker list. -// | 3. topics: , delimited list of topics to consume -// | 4. intervalInSec: batch interval for this job. -// | 5. batchSize: how many edges/vertices will be grouped for bulk mutations. -// | 6. hbaseZkQuorum: s2graph zookeeper address. -// | 7. hTableName: physical hbase table name. -// | 8. labelMapping: oldLabel:newLabel delimited by , -// |*/ -// """.stripMargin -// override def run() = { -// if (args.length != 9) { -// System.err.println(usages) -// System.exit(1) -// } -// val kafkaZkQuorum = args(0) -// val brokerList = args(1) -// val topics = args(2) -// val intervalInSec = seconds(args(3).toLong) -// val dbUrl = args(4) -// val batchSize = args(5).toInt -// val hbaseZkQuorum = args(6) -// val hTableName = args(7) -// val labelMapping = GraphSubscriberHelper.toLabelMapping(args(8)) -// -// -// if (!GraphSubscriberHelper.isValidQuorum(hbaseZkQuorum)) -// throw new RuntimeException(s"$hbaseZkQuorum is not valid.") -// -// val conf = sparkConf(s"$topics: GraphSubscriberStreaming") -// val ssc = streamingContext(conf, intervalInSec) -// val sc = ssc.sparkContext -// -// val groupId = topics.replaceAll(",", "_") + "_stream" -// val fallbackTopic = topics.replaceAll(",", "_") + "_stream_failed" -// -// val kafkaParams = Map( -// "zookeeper.connect" -> kafkaZkQuorum, -// "group.id" -> groupId, -// "zookeeper.connection.timeout.ms" -> "10000", -// "metadata.broker.list" -> brokerList, -// "auto.offset.reset" -> "largest") -// -// val stream = createKafkaValueStreamMulti(ssc, kafkaParams, topics, 8, None).flatMap(s => s.split("\n")) -// -// val mapAcc = sc.accumulable(new MutableHashMap[String, Long](), "Throughput")(HashMapParam[String, Long](_ + _)) -// -// -// stream.foreachRDD(rdd => { -// -// rdd.foreachPartition(partition => { -// // set executor setting. -// val phase = System.getProperty("phase") -// GraphSubscriberHelper.apply(phase, dbUrl, hbaseZkQuorum, brokerList) -// -// partition.grouped(batchSize).foreach { msgs => -// try { -// val start = System.currentTimeMillis() -// // val counts = -// // GraphSubscriberHelper.store(msgs, GraphSubscriberHelper.toOption(newLabelName))(Some(mapAcc)) -// val dummyStatFunc = (s: String, i: Int) => {} -// val elements = GraphSubscriberHelper.toGraphElements(msgs)(dummyStatFunc) -// for { -// element <- elements -// } { -// element.serviceName -// } -//// val counts = -//// GraphSubscriberHelper.storeBulk(hbaseZkQuorum, hTableName)(msgs, labelMapping)(Some(mapAcc)) -//// -//// for ((k, v) <- counts) { -//// mapAcc +=(k, v) -//// } -// val duration = System.currentTimeMillis() - start -// println(s"[Success]: store, $mapAcc, $duration, $hbaseZkQuorum, $hTableName") -// } catch { -// case e: Throwable => -// println(s"[Failed]: store $e") -// -// msgs.foreach { msg => -// GraphSubscriberHelper.report(msg, Some(e.getMessage()), topic = fallbackTopic) -// } -// } -// } -// }) -// }) -// -// -// logInfo(s"counter: ${mapAcc.value}") -// println(s"counter: ${mapAcc.value}") -// ssc.start() -// ssc.awaitTermination() -// } -//} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/main/scala/subscriber/TestEdgeBuilder.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/subscriber/TestEdgeBuilder.scala b/loader/src/main/scala/subscriber/TestEdgeBuilder.scala deleted file mode 100644 index f5f43f8..0000000 --- a/loader/src/main/scala/subscriber/TestEdgeBuilder.scala +++ /dev/null @@ -1,66 +0,0 @@ -package subscriber - -import org.apache.spark.SparkContext -import s2.spark.{SparkApp, WithKafka} - -import scala.util.Random - -/** - * Created by shon on 7/27/15. - */ -//object TestEdgeBuilder extends SparkApp with WithKafka { -// val sleepPeriod = 5000 -// val usages = -// s""" -// |/** -// |0: numOfRows = -// |*/ -// """.stripMargin -// -// override def run() = { -// /** -// * label definition can be found on migrate/s2graph/bmt.schema -// * Main function -// * numOfRows: number of rows -// * numOfCols: number of cols -// * numOfMetas: number of metas -// * -// */ -// println(args.toList) -// val conf = sparkConf(s"TestEdgeBuilder") -// val sc = new SparkContext(conf) -// val phase = args(0) -// val dbUrl = args(1) -// val zkQuorum = args(2) -// val hTableName = args(3) -// val labelName = args(4) -// val metaName = args(5) -// -// val numOfRows = if (args.length >= 7) args(6).toInt else 100000 -// val numOfCols = if (args.length >= 8) args(7).toInt else 10000 -// val dimOfCols = if (args.length >= 9) args(8).toInt else 10000 -// val numOfSlice = if (args.length >= 10) args(9).toInt else 10 -// val batchSize = if (args.length >= 11) args(10).toInt else 100 -// -// sc.parallelize((0 until numOfRows), numOfSlice).foreachPartition { partition => -// -// GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, "none") -// -// partition.grouped(batchSize).foreach { rows => -// for { -// rowId <- rows -// } { -// val ts = System.currentTimeMillis() -// val msgs = for { -// colId <- (0 until numOfCols) -// metaId = Random.nextInt(dimOfCols) -// } yield { -// List(ts, "insertBulk", "edge", rowId, colId, labelName, s"""{"$metaName": $metaId}""").mkString("\t") -// } -// GraphSubscriberHelper.storeBulk(zkQuorum, hTableName)(msgs)(None) -// } -// } -// } -// } -//} - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/main/scala/subscriber/TransferToHFile.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/subscriber/TransferToHFile.scala b/loader/src/main/scala/subscriber/TransferToHFile.scala deleted file mode 100644 index 516bb39..0000000 --- a/loader/src/main/scala/subscriber/TransferToHFile.scala +++ /dev/null @@ -1,195 +0,0 @@ -package subscriber - - -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls.{LabelMeta, Label} -import com.kakao.s2graph.core.types.{InnerValLikeWithTs, LabelWithDirection, SourceVertexId} -import org.apache.hadoop.hbase.client.Put -import org.apache.hadoop.hbase.io.compress.Compression.Algorithm -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding -import org.apache.hadoop.hbase.mapreduce.{TableOutputFormat} -import org.apache.hadoop.hbase._ -import org.apache.hadoop.hbase.regionserver.BloomType -import org.apache.hadoop.hbase.util.Bytes -import org.apache.spark.{SparkContext} -import org.apache.spark.rdd.RDD -import org.hbase.async.{PutRequest} -import play.api.libs.json.Json -import s2.spark.{SparkApp} -import spark.{FamilyHFileWriteOptions, KeyFamilyQualifier, HBaseContext} -import scala.collection.JavaConversions._ - - -object TransferToHFile extends SparkApp with JSONParser { - - val usages = - s""" - |create HFiles for hbase table on zkQuorum specified. - |note that hbase table is created already and pre-splitted properly. - | - |params: - |0. input: hdfs path for tsv file(bulk format). - |1. output: hdfs path for storing HFiles. - |2. zkQuorum: running hbase cluster zkQuorum. - |3. tableName: table name for this bulk upload. - |4. dbUrl: db url for parsing to graph element. - """.stripMargin - - //TODO: Process AtomicIncrementRequest too. - /** build key values */ - case class DegreeKey(vertexIdStr: String, labelName: String, direction: String) - - private def insertBulkForLoaderAsync(edge: Edge, createRelEdges: Boolean = true): List[PutRequest] = { - val relEdges = if (createRelEdges) edge.relatedEdges else List(edge) - buildPutRequests(edge.toSnapshotEdge) ++ relEdges.toList.flatMap { e => - e.edgesWithIndex.flatMap { indexEdge => buildPutRequests(indexEdge) } - } - } - - def buildDegrees(msgs: RDD[String], labelMapping: Map[String, String], edgeAutoCreate: Boolean) = { - for { - msg <- msgs - tokens = GraphUtil.split(msg) - if tokens(2) == "e" || tokens(2) == "edge" - tempDirection = if (tokens.length == 7) "out" else tokens(7) - direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection - reverseDirection = if (direction == "out") "in" else "out" - convertedLabelName = labelMapping.get(tokens(5)).getOrElse(tokens(5)) - (vertexIdStr, vertexIdStrReversed) = (tokens(3), tokens(4)) - degreeKey = DegreeKey(vertexIdStr, convertedLabelName, direction) - degreeKeyReversed = DegreeKey(vertexIdStrReversed, convertedLabelName, reverseDirection) - extra = if (edgeAutoCreate) List(degreeKeyReversed -> 1L) else Nil - output <- List(degreeKey -> 1L) ++ extra - } yield output - } - def buildPutRequests(snapshotEdge: SnapshotEdge): List[PutRequest] = { - val kvs = GraphSubscriberHelper.g.storage.snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList - kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) } - } - def buildPutRequests(indexEdge: IndexEdge): List[PutRequest] = { - val kvs = GraphSubscriberHelper.g.storage.indexEdgeSerializer(indexEdge).toKeyValues.toList - kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) } - } - def buildDegreePutRequests(vertexId: String, labelName: String, direction: String, degreeVal: Long): List[PutRequest] = { - val label = Label.findByName(labelName).getOrElse(throw new RuntimeException(s"$labelName is not found in DB.")) - val dir = GraphUtil.directions(direction) - val innerVal = jsValueToInnerVal(Json.toJson(vertexId), label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse { - throw new RuntimeException(s"$vertexId can not be converted into innerval") - } - val vertex = Vertex(SourceVertexId(label.srcColumn.id.get, innerVal)) - - val ts = System.currentTimeMillis() - val propsWithTs = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion)) - val labelWithDir = LabelWithDirection(label.id.get, dir) - val edge = Edge(vertex, vertex, labelWithDir, propsWithTs=propsWithTs) - - edge.edgesWithIndex.flatMap { indexEdge => - GraphSubscriberHelper.g.storage.indexEdgeSerializer(indexEdge).toKeyValues.map { kv => - new PutRequest(kv.table, kv.row, kv.cf, Array.empty[Byte], Bytes.toBytes(degreeVal), kv.timestamp) - } - } - } - - def toKeyValues(degreeKeyVals: Seq[(DegreeKey, Long)]): Iterator[KeyValue] = { - val kvs = for { - (key, value) <- degreeKeyVals - putRequest <- buildDegreePutRequests(key.vertexIdStr, key.labelName, key.direction, value) - } yield { - val p = putRequest - val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, p.value) - kv - } - kvs.toIterator - } - - def toKeyValues(strs: Seq[String], labelMapping: Map[String, String], autoEdgeCreate: Boolean): Iterator[KeyValue] = { - val kvs = for { - s <- strs - element <- Graph.toGraphElement(s, labelMapping).toSeq if element.isInstanceOf[Edge] - edge = element.asInstanceOf[Edge] - putRequest <- insertBulkForLoaderAsync(edge, autoEdgeCreate) - } yield { - val p = putRequest - val kv = new KeyValue(p.key(), p.family(), p.qualifier, p.timestamp, p.value) - - // println(s"[Edge]: $edge\n[Put]: $p\n[KeyValue]: ${kv.getRow.toList}, ${kv.getQualifier.toList}, ${kv.getValue.toList}, ${kv.getTimestamp}") - - kv - } - kvs.toIterator - } - - - override def run() = { - val input = args(0) - val tmpPath = args(1) - val zkQuorum = args(2) - val tableName = args(3) - val dbUrl = args(4) - val maxHFilePerResionServer = args(5).toInt - val labelMapping = if (args.length >= 7) GraphSubscriberHelper.toLabelMapping(args(6)) else Map.empty[String, String] - val autoEdgeCreate = if (args.length >= 8) args(7).toBoolean else false - val buildDegree = if (args.length >= 9) args(8).toBoolean else true - val compressionAlgorithm = if (args.length >= 10) args(9) else "lz4" - val conf = sparkConf(s"$input: TransferToHFile") - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - conf.set("spark.kryoserializer.buffer.mb", "24") - - val sc = new SparkContext(conf) - - Management.createTable(zkQuorum, tableName, List("e", "v"), maxHFilePerResionServer, None, compressionAlgorithm) - - /** set up hbase init */ - val hbaseConf = HBaseConfiguration.create() - hbaseConf.set("hbase.zookeeper.quorum", zkQuorum) - hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName) - hbaseConf.set("hadoop.tmp.dir", s"/tmp/$tableName") - - - val rdd = sc.textFile(input) - - - val kvs = rdd.mapPartitions { iter => - val phase = System.getProperty("phase") - GraphSubscriberHelper.apply(phase, dbUrl, "none", "none") - toKeyValues(iter.toSeq, labelMapping, autoEdgeCreate) - } - // - // val newRDD = if (!buildDegree) new HFileRDD(kvs) - // else { - // val degreeKVs = buildDegrees(rdd, labelMapping, autoEdgeCreate).reduceByKey { (agg, current) => - // agg + current - // }.mapPartitions { iter => - // val phase = System.getProperty("phase") - // GraphSubscriberHelper.apply(phase, dbUrl, "none", "none") - // toKeyValues(iter.toSeq) - // } - // new HFileRDD(kvs ++ degreeKVs) - // } - // - // newRDD.toHFile(hbaseConf, zkQuorum, tableName, maxHFilePerResionServer, tmpPath) - val merged = if (!buildDegree) kvs - else { - kvs ++ buildDegrees(rdd, labelMapping, autoEdgeCreate).reduceByKey { (agg, current) => - agg + current - }.mapPartitions { iter => - val phase = System.getProperty("phase") - GraphSubscriberHelper.apply(phase, dbUrl, "none", "none") - toKeyValues(iter.toSeq) - } - } - - val hbaseSc = new HBaseContext(sc, hbaseConf) - def flatMap(kv: KeyValue): Iterator[(KeyFamilyQualifier, Array[Byte])] = { - val k = new KeyFamilyQualifier(kv.getRow(), kv.getFamily(), kv.getQualifier()) - val v = kv.getValue() - Seq((k -> v)).toIterator - } - val familyOptions = new FamilyHFileWriteOptions(Algorithm.LZ4.getName.toUpperCase, - BloomType.ROW.name().toUpperCase, 32768, DataBlockEncoding.FAST_DIFF.name().toUpperCase) - val familyOptionsMap = Map("e".getBytes() -> familyOptions, "v".getBytes() -> familyOptions) - - hbaseSc.bulkLoad(merged, TableName.valueOf(tableName), flatMap, tmpPath, familyOptionsMap) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/main/scala/subscriber/VertexDegreeBuilder.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/subscriber/VertexDegreeBuilder.scala b/loader/src/main/scala/subscriber/VertexDegreeBuilder.scala deleted file mode 100644 index c8255dd..0000000 --- a/loader/src/main/scala/subscriber/VertexDegreeBuilder.scala +++ /dev/null @@ -1,102 +0,0 @@ -package subscriber - -/** - * Created by shon on 7/3/15. - */ - -import com.kakao.s2graph.core.{GraphUtil, Management} -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD -import s2.spark.{HashMapParam, WithKafka, SparkApp} - -import scala.collection.mutable - -/** - * Created by shon on 7/3/15. - */ - -//object VertexDegreeBuilder extends SparkApp with WithKafka { -// val sleepPeriod = 5000 -// val usages = -// s""" -// |/** -// |* this job read edge format(TSV) from HDFS file system then bulk load edges into s2graph. assumes that newLabelName is already created by API. -// |* params: -// |* 0. hdfsPath: where is your data in hdfs. require full path with hdfs:// predix -// |* 1. dbUrl: jdbc database connection string to specify database for meta. -// |* 2. labelMapping: oldLabel:newLabel delimited by , -// |* 3. outputPath: degree output Path. -// |* 4. edgeAutoCreate: true if need to create reversed edge automatically. -// |* -// |* after this job finished, s2graph will have data with sequence corresponding newLabelName. -// |* change this newLabelName to ogirinalName if you want to online replace of label. -// |* -// |*/ -// """.stripMargin -// -// override def run() = { -// /** -// * Main function -// */ -// println(args.toList) -// if (args.length != 5) { -// System.err.println(usages) -// System.exit(1) -// } -// val hdfsPath = args(0) -// val dbUrl = args(1) -// val labelMapping = GraphSubscriberHelper.toLabelMapping(args(2)) -// val outputPath = args(3) -// val edgeAutoCreate = args(4).toBoolean -// -// val conf = sparkConf(s"$hdfsPath: VertexDegreeBuilder") -// val sc = new SparkContext(conf) -// val mapAcc = sc.accumulable(mutable.HashMap.empty[String, Long], "counter")(HashMapParam[String, Long](_ + _)) -// -// /** this job expect only one hTableName. all labels in this job will be stored in same physical hbase table */ -// try { -// -// // set local driver setting. -// val phase = System.getProperty("phase") -// GraphSubscriberHelper.apply(phase, dbUrl, "none", "none") -// -// /** copy when oldLabel exist and newLabel done exist. otherwise ignore. */ -// -// -// val msgs = sc.textFile(hdfsPath) -// -// /** change assumption here. this job only take care of one label data */ -// val degreeStart: RDD[((String, String, String), Int)] = msgs.filter { msg => -// val tokens = GraphUtil.split(msg) -// (tokens(2) == "e" || tokens(2) == "edge") -// } flatMap { msg => -// val tokens = GraphUtil.split(msg) -// val tempDirection = if (tokens.length == 7) "out" else tokens(7) -// val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection -// val reverseDirection = if (direction == "out") "in" else "out" -// val convertedLabelName = labelMapping.get(tokens(5)).getOrElse(tokens(5)) -// val (vertexWithLabel, reverseVertexWithLabel) = if (direction == "out") { -// ( -// (tokens(3), convertedLabelName, direction), -// (tokens(4), convertedLabelName, reverseDirection) -// ) -// } else { -// ( -// (tokens(4), convertedLabelName, direction), -// (tokens(3), convertedLabelName, reverseDirection) -// ) -// } -// if (edgeAutoCreate) { -// List((vertexWithLabel -> 1), (reverseVertexWithLabel -> 1)) -// } else { -// List((vertexWithLabel -> 1)) -// } -// } -// val vertexDegrees = degreeStart.reduceByKey(_ + _) -// vertexDegrees.map { case ((vertexId, labelName, dir), degreeVal) => -// Seq(vertexId, labelName, dir, degreeVal).mkString("\t") -// }.saveAsTextFile(outputPath) -// -// } -// } -//} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/main/scala/subscriber/WalLogStat.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/subscriber/WalLogStat.scala b/loader/src/main/scala/subscriber/WalLogStat.scala deleted file mode 100644 index f5db2c1..0000000 --- a/loader/src/main/scala/subscriber/WalLogStat.scala +++ /dev/null @@ -1,94 +0,0 @@ -package subscriber - -import java.text.SimpleDateFormat -import java.util.Date - -import com.kakao.s2graph.core.Graph -import kafka.producer.KeyedMessage -import kafka.serializer.StringDecoder -import org.apache.spark.streaming.Durations._ -import org.apache.spark.streaming.kafka.HasOffsetRanges -import s2.spark.{HashMapParam, SparkApp, WithKafka} - -import scala.collection.mutable.{HashMap => MutableHashMap} -import scala.language.postfixOps - -object WalLogStat extends SparkApp with WithKafka { - - override def run() = { - - validateArgument("kafkaZkQuorum", "brokerList", "topics", "intervalInSec", "dbUrl", "statTopic") - - val kafkaZkQuorum = args(0) - val brokerList = args(1) - val topics = args(2) - val intervalInSec = seconds(args(3).toLong) - val dbUrl = args(4) - val statTopic = args(5) - - - val conf = sparkConf(s"$topics: ${getClass.getSimpleName}") - val ssc = streamingContext(conf, intervalInSec) - val sc = ssc.sparkContext - - val groupId = topics.replaceAll(",", "_") + "_stat" - - val kafkaParams = Map( - "zookeeper.connect" -> kafkaZkQuorum, - "group.id" -> groupId, - "metadata.broker.list" -> brokerList, - "zookeeper.connection.timeout.ms" -> "10000", - "auto.offset.reset" -> "largest") - - val stream = getStreamHelper(kafkaParams).createStream[String, String, StringDecoder, StringDecoder](ssc, topics.split(",").toSet) - val statProducer = getProducer[String, String](brokerList) - - stream.foreachRDD { (rdd, time) => - - val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges - - val ts = time.milliseconds - - val elements = rdd.mapPartitions { partition => - // set executor setting. - val phase = System.getProperty("phase") - GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList) - partition.map { case (key, msg) => - Graph.toGraphElement(msg) match { - case Some(elem) => - val serviceName = elem.serviceName - msg.split("\t", 7) match { - case Array(_, operation, log_type, _, _, label, _*) => - Seq(serviceName, label, operation, log_type).mkString("\t") - case _ => - Seq("no_service_name", "no_label", "no_operation", "parsing_error").mkString("\t") - } - case None => - Seq("no_service_name", "no_label", "no_operation", "no_element_error").mkString("\t") - } - } - } - - val countByKey = elements.map(_ -> 1L).reduceByKey(_ + _).collect() - val totalCount = countByKey.map(_._2).sum - val keyedMessage = countByKey.map { case (key, value) => - new KeyedMessage[String, String](statTopic, s"$ts\t$key\t$value\t$totalCount") - } - - statProducer.send(keyedMessage: _*) - - elements.mapPartitionsWithIndex { (i, part) => - // commit offset range - val osr = offsets(i) - getStreamHelper(kafkaParams).commitConsumerOffset(osr) - Iterator.empty - }.foreach { - (_: Nothing) => () - } - - } - - ssc.start() - ssc.awaitTermination() - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/main/scala/subscriber/WalLogToHDFS.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/subscriber/WalLogToHDFS.scala b/loader/src/main/scala/subscriber/WalLogToHDFS.scala deleted file mode 100644 index 44902c2..0000000 --- a/loader/src/main/scala/subscriber/WalLogToHDFS.scala +++ /dev/null @@ -1,149 +0,0 @@ -package subscriber - -import java.text.SimpleDateFormat -import java.util.Date - -import com.kakao.s2graph.core.Graph -import kafka.serializer.StringDecoder -import org.apache.spark.sql.hive.HiveContext -import org.apache.spark.streaming.Durations._ -import org.apache.spark.streaming.kafka.HasOffsetRanges -import s2.spark.{HashMapParam, SparkApp, WithKafka} - -import scala.collection.mutable.{HashMap => MutableHashMap} -import scala.language.postfixOps - -object WalLogToHDFS extends SparkApp with WithKafka { - - override def run() = { - - validateArgument("kafkaZkQuorum", "brokerList", "topics", "intervalInSec", "dbUrl", "outputPath", "hiveDatabase", "hiveTable", "splitListPath") - - val kafkaZkQuorum = args(0) - val brokerList = args(1) - val topics = args(2) - val intervalInSec = seconds(args(3).toLong) - val dbUrl = args(4) - val outputPath = args(5) - val hiveDatabase = args(6) - val hiveTable = args(7) - val splitListPath = args(8) - - val conf = sparkConf(s"$topics: WalLogToHDFS") - val ssc = streamingContext(conf, intervalInSec) - val sc = ssc.sparkContext - - val groupId = topics.replaceAll(",", "_") + "_stream" - val fallbackTopic = topics.replaceAll(",", "_") + "_stream_failed" - - val kafkaParams = Map( - "zookeeper.connect" -> kafkaZkQuorum, - "group.id" -> groupId, - "metadata.broker.list" -> brokerList, - "zookeeper.connection.timeout.ms" -> "10000", - "auto.offset.reset" -> "largest") - - val stream = getStreamHelper(kafkaParams).createStream[String, String, StringDecoder, StringDecoder](ssc, topics.split(",").toSet) - - val mapAcc = sc.accumulable(new MutableHashMap[String, Long](), "Throughput")(HashMapParam[String, Long](_ + _)) - - val hdfsBlockSize = 134217728 // 128M - val hiveContext = new HiveContext(sc) - var splits = Array[String]() - var excludeLabels = Set[String]() - var excludeServices = Set[String]() - stream.foreachRDD { (rdd, time) => - try { - val read = sc.textFile(splitListPath).collect().map(_.split("=")).flatMap { - case Array(value) => Some(("split", value)) - case Array(key, value) => Some((key, value)) - case _ => None - } - splits = read.filter(_._1 == "split").map(_._2) - excludeLabels = read.filter(_._1 == "exclude_label").map(_._2).toSet - excludeServices = read.filter(_._1 == "exclude_service").map(_._2).toSet - } catch { - case _: Throwable => // use previous information - } - - val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges - - val elements = rdd.mapPartitions { partition => - // set executor setting. - val phase = System.getProperty("phase") - GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList) - - partition.flatMap { case (key, msg) => - val optMsg = Graph.toGraphElement(msg).flatMap { element => - val arr = msg.split("\t", 7) - val service = element.serviceName - val label = arr(5) - val n = arr.length - - if (excludeServices.contains(service) || excludeLabels.contains(label)) { - None - } else if(n == 6) { - Some(Seq(msg, "{}", service).mkString("\t")) - } - else if(n == 7) { - Some(Seq(msg, service).mkString("\t")) - } - else { - None - } - } - optMsg - } - } - - val ts = time.milliseconds - val dateId = new SimpleDateFormat("yyyy-MM-dd").format(new Date(ts)) - - /** make sure that `elements` are not running at the same time */ - val elementsWritten = { - elements.cache() - (Array("all") ++ splits).foreach { - case split if split == "all" => - val path = s"$outputPath/split=$split/date_id=$dateId/ts=$ts" - elements.saveAsTextFile(path) - case split => - val path = s"$outputPath/split=$split/date_id=$dateId/ts=$ts" - val strlen = split.length - val splitData = elements.filter(_.takeRight(strlen) == split).cache() - val totalSize = splitData - .mapPartitions { iterator => - val s = iterator.map(_.length.toLong).sum - Iterator.single(s) - } - .sum - .toLong - val numPartitions = math.max(1, (totalSize / hdfsBlockSize.toDouble).toInt) - splitData.coalesce(math.min(splitData.partitions.length, numPartitions)).saveAsTextFile(path) - splitData.unpersist() - } - elements.unpersist() - elements - } - - elementsWritten.mapPartitionsWithIndex { (i, part) => - // commit offset range - val osr = offsets(i) - getStreamHelper(kafkaParams).commitConsumerOffset(osr) - Iterator.empty - }.foreach { - (_: Nothing) => () - } - - (Array("all") ++ splits).foreach { split => - val path = s"$outputPath/split=$split/date_id=$dateId/ts=$ts" - hiveContext.sql(s"use $hiveDatabase") - hiveContext.sql(s"alter table $hiveTable add partition (split='$split', date_id='$dateId', ts='$ts') location '$path'") - } - } - - logInfo(s"counter: ${mapAcc.value}") - println(s"counter: ${mapAcc.value}") - ssc.start() - ssc.awaitTermination() - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/test/scala/org/apache/s2graph/loader/subscriber/GraphSubscriberTest.scala ---------------------------------------------------------------------- diff --git a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/GraphSubscriberTest.scala b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/GraphSubscriberTest.scala new file mode 100644 index 0000000..1f93134 --- /dev/null +++ b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/GraphSubscriberTest.scala @@ -0,0 +1,45 @@ +package org.apache.s2graph.loader.subscriber + +import org.apache.s2graph.core.Management +import org.apache.s2graph.spark.spark.WithKafka +import org.scalatest.{ FunSuite, Matchers } +import play.api.libs.json.{JsBoolean, JsNumber} + +class GraphSubscriberTest extends FunSuite with Matchers with WithKafka { + val phase = "dev" + val dbUrl = "jdbc:mysql://localhost:3306/graph_dev" + val zkQuorum = "localhost" + val kafkaBrokerList = "localhost:9099" + val currentTs = System.currentTimeMillis() + val op = "insertBulk" + val testLabelName = "s2graph_label_test" + val labelToReplace = "s2graph_label_test_new" + val serviceName = "s2graph" + val columnName = "user_id" + val columnType = "long" + val indexProps = Seq("time" -> JsNumber(0), "weight" -> JsNumber(0)) + val props = Seq("is_hidden" -> JsBoolean(false), "is_blocked" -> JsBoolean(false)) + val hTableName = "s2graph-dev_new" + val ttl = 86000 + val testStrings = List("1431788400000\tinsertBulk\te\t147229417\t99240432\ts2graph_label_test\t{\"is_hidden\": true}") + + GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList) + + test("GraphSubscriberHelper.store") { + // actually we need to delete labelToReplace first for each test. + val labelMapping = Map(testLabelName -> labelToReplace) + GraphSubscriberHelper.management.copyLabel(testLabelName, labelToReplace, Some(hTableName)) + +// +// val msgs = (for { +// i <- (1 until 10) +// j <- (100 until 110) +// } yield { +// s"$currentTs\t$op\tedge\t$i\t$j\t$testLabelName" +// }).toSeq + val msgs = testStrings + +// val stat = GraphSubscriberHelper.storeBulk(zkQuorum, hTableName)(msgs, labelMapping = labelMapping, autoCreateEdge = false)(None) +// println(stat) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala ---------------------------------------------------------------------- diff --git a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala new file mode 100644 index 0000000..b0dd80d --- /dev/null +++ b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala @@ -0,0 +1,169 @@ +package org.apache.s2graph.loader.subscriber + +import org.apache.s2graph.core.Management +import org.apache.s2graph.core.types.HBaseType +import org.apache.spark.{SparkConf, SparkContext} +import org.scalatest._ +import TransferToHFile._ + +/** + * Created by Eric on 2015. 12. 2.. + */ +class TransferToHFileTest extends FlatSpec with BeforeAndAfterAll with Matchers { + + private val master = "local[2]" + private val appName = "example-spark" + + private var sc: SparkContext = _ + + val dataWithoutDir = + """ + |1447686000000 insertBulk e a b friends_rel {} + |1447686000000 insertBulk e a c friends_rel {} + |1447686000000 insertBulk e a d friends_rel {} + |1447686000000 insertBulk e b d friends_rel {} + |1447686000000 insertBulk e b e friends_rel {} + """.stripMargin.trim + + val dataWithDir = + """ + |1447686000000 insertBulk e a b friends_rel {} out + |1447686000000 insertBulk e b a friends_rel {} in + |1447686000000 insertBulk e a c friends_rel {} out + |1447686000000 insertBulk e c a friends_rel {} in + |1447686000000 insertBulk e a d friends_rel {} out + |1447686000000 insertBulk e d a friends_rel {} in + |1447686000000 insertBulk e b d friends_rel {} out + |1447686000000 insertBulk e d b friends_rel {} in + |1447686000000 insertBulk e b e friends_rel {} out + |1447686000000 insertBulk e e b friends_rel {} in + """.stripMargin.trim + + override def beforeAll(): Unit = { + println("### beforeAll") + + GraphSubscriberHelper.apply("dev", "none", "none", "none") + // 1. create service + if(Management.findService("loader-test").isEmpty) { + println(">>> create service...") + Management.createService("loader-test", "localhost", "loader-test-dev", 1, None, "gz") + } + + // 2. create label + if(Management.findLabel("friends_rel").isEmpty) { + println(">>> create label...") + Management.createLabel( + "friends_rel", + "loader-test", "user_id", "string", + "loader-test", "user_id", "string", + true, + "loader-test", + Seq(), + Seq(), + "weak", + None, None, + HBaseType.DEFAULT_VERSION, + false, + Management.defaultCompressionAlgorithm + ) + } + + // create spark context + val conf = new SparkConf() + .setMaster(master) + .setAppName(appName) + + sc = new SparkContext(conf) + } + + override def afterAll(): Unit = { + println("### afterALL") + if (sc != null) { + sc.stop() + } + + Management.deleteLabel("friends_rel") + } + + "buildDegreePutRequest" should "transform degree to PutRequest" in { + val putReqs = buildDegreePutRequests("a", "friends_rel", "out", 3L) + putReqs.size should equal(1) + } + + "toKeyValues" should "transform edges to KeyValues on edge format data without direction" in { + val rdd = sc.parallelize(dataWithoutDir.split("\n")) + + val kvs = rdd.mapPartitions { iter => + GraphSubscriberHelper.apply("dev", "none", "none", "none") + TransferToHFile.toKeyValues(iter.toSeq, Map.empty[String, String], false) + } + kvs.foreach(println) + // edges * 2 (snapshot edges + indexed edges) + kvs.count() should equal(10) + + + val kvsAutoCreated = rdd.mapPartitions { iter => + GraphSubscriberHelper.apply("dev", "none", "none", "none") + TransferToHFile.toKeyValues(iter.toSeq, Map.empty[String, String], true) + } + + // edges * 3 (snapshot edges + indexed edges + reverse edges) + kvsAutoCreated.count() should equal(15) + } + + "toKeyValues" should "transform edges to KeyValues on edge format data with direction" in { + val rdd = sc.parallelize(dataWithDir.split("\n")) + + val kvs = rdd.mapPartitions { iter => + GraphSubscriberHelper.apply("dev", "none", "none", "none") + TransferToHFile.toKeyValues(iter.toSeq, Map.empty[String, String], false) + } + + // edges * 2 (snapshot edges + indexed edges) + kvs.count() should equal(20) + } + + "buildDegrees" should "build degrees on edge format data without direction" in { + val rdd = sc.parallelize(dataWithoutDir.split("\n")) + + // autoCreate = false + val degrees = TransferToHFile.buildDegrees(rdd, Map.empty[String, String], false).reduceByKey { (agg, current) => + agg + current + }.collectAsMap() + degrees.size should equal(2) + + degrees should contain(DegreeKey("a", "friends_rel", "out") -> 3L) + degrees should contain(DegreeKey("b", "friends_rel", "out") -> 2L) + + + // autoCreate = true + val degreesAutoCreated = TransferToHFile.buildDegrees(rdd, Map.empty[String, String], true).reduceByKey { (agg, current) => + agg + current + }.collectAsMap() + degreesAutoCreated.size should equal(6) + + degreesAutoCreated should contain(DegreeKey("a", "friends_rel", "out") -> 3L) + degreesAutoCreated should contain(DegreeKey("b", "friends_rel", "out") -> 2L) + degreesAutoCreated should contain(DegreeKey("b", "friends_rel", "in") -> 1L) + degreesAutoCreated should contain(DegreeKey("c", "friends_rel", "in") -> 1L) + degreesAutoCreated should contain(DegreeKey("d", "friends_rel", "in") -> 2L) + degreesAutoCreated should contain(DegreeKey("e", "friends_rel", "in") -> 1L) + } + + "buildDegrees" should "build degrees on edge format data with direction" in { + val rdd = sc.parallelize(dataWithDir.split("\n")) + + val degrees = TransferToHFile.buildDegrees(rdd, Map.empty[String, String], false).reduceByKey { (agg, current) => + agg + current + }.collectAsMap() + + degrees.size should equal(6) + + degrees should contain(DegreeKey("a", "friends_rel", "out") -> 3L) + degrees should contain(DegreeKey("b", "friends_rel", "out") -> 2L) + degrees should contain(DegreeKey("b", "friends_rel", "in") -> 1L) + degrees should contain(DegreeKey("c", "friends_rel", "in") -> 1L) + degrees should contain(DegreeKey("d", "friends_rel", "in") -> 2L) + degrees should contain(DegreeKey("e", "friends_rel", "in") -> 1L) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/test/scala/subscriber/GraphSubscriberTest.scala ---------------------------------------------------------------------- diff --git a/loader/src/test/scala/subscriber/GraphSubscriberTest.scala b/loader/src/test/scala/subscriber/GraphSubscriberTest.scala deleted file mode 100644 index f927509..0000000 --- a/loader/src/test/scala/subscriber/GraphSubscriberTest.scala +++ /dev/null @@ -1,45 +0,0 @@ -package subscriber - -import com.kakao.s2graph.core.Management -import org.scalatest.{ FunSuite, Matchers } -import play.api.libs.json.{JsBoolean, JsNumber} -import s2.spark.WithKafka - -class GraphSubscriberTest extends FunSuite with Matchers with WithKafka { - val phase = "dev" - val dbUrl = "jdbc:mysql://localhost:3306/graph_dev" - val zkQuorum = "localhost" - val kafkaBrokerList = "localhost:9099" - val currentTs = System.currentTimeMillis() - val op = "insertBulk" - val testLabelName = "s2graph_label_test" - val labelToReplace = "s2graph_label_test_new" - val serviceName = "s2graph" - val columnName = "user_id" - val columnType = "long" - val indexProps = Seq("time" -> JsNumber(0), "weight" -> JsNumber(0)) - val props = Seq("is_hidden" -> JsBoolean(false), "is_blocked" -> JsBoolean(false)) - val hTableName = "s2graph-dev_new" - val ttl = 86000 - val testStrings = List("1431788400000\tinsertBulk\te\t147229417\t99240432\ts2graph_label_test\t{\"is_hidden\": true}") - - GraphSubscriberHelper.apply(phase, dbUrl, zkQuorum, kafkaBrokerList) - - test("GraphSubscriberHelper.store") { - // actually we need to delete labelToReplace first for each test. - val labelMapping = Map(testLabelName -> labelToReplace) - GraphSubscriberHelper.management.copyLabel(testLabelName, labelToReplace, Some(hTableName)) - -// -// val msgs = (for { -// i <- (1 until 10) -// j <- (100 until 110) -// } yield { -// s"$currentTs\t$op\tedge\t$i\t$j\t$testLabelName" -// }).toSeq - val msgs = testStrings - -// val stat = GraphSubscriberHelper.storeBulk(zkQuorum, hTableName)(msgs, labelMapping = labelMapping, autoCreateEdge = false)(None) -// println(stat) - } -}
