add s2graph sink for spark structured streaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b14b293d Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b14b293d Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b14b293d Branch: refs/heads/master Commit: b14b293d478571dbca7936c64adbe119e564fc43 Parents: e0ba0aa Author: Chul Kang <[email protected]> Authored: Thu Mar 22 18:53:18 2018 +0900 Committer: Chul Kang <[email protected]> Committed: Thu Mar 29 12:02:57 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/s2jobs/task/Sink.scala | 13 +++ .../s2graph/spark/sql/streaming/Logger.scala | 7 ++ .../spark/sql/streaming/S2CommitProtocol.scala | 48 ++++++++ .../spark/sql/streaming/S2SinkConfigs.scala | 28 +++++ .../spark/sql/streaming/S2SinkContext.scala | 25 ++++ .../spark/sql/streaming/S2SinkMetadataLog.scala | 25 ++++ .../spark/sql/streaming/S2SinkProvider.scala | 26 +++++ .../spark/sql/streaming/S2SinkStatus.scala | 9 ++ .../sql/streaming/S2SparkSqlStreamingSink.scala | 71 ++++++++++++ .../sql/streaming/S2StreamQueryWriter.scala | 114 +++++++++++++++++++ 10 files changed, 366 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b14b293d/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala index 6249891..0b83a6a 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala @@ -163,3 +163,16 @@ class ESSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { } } +/** + * S2graphSink + * @param queryName + * @param conf + */ +class S2graphSink(queryName:String, conf:TaskConf) extends Sink(queryName, conf) { + override def mandatoryOptions: Set[String] = Set() + override val FORMAT: String = "org.apache.s2graph.spark.sql.streaming.S2SinkProvider" + + override protected def writeBatch(writer: DataFrameWriter[Row]): Unit = + throw new RuntimeException(s"unsupported source type for ${this.getClass.getSimpleName} : ${conf.name}") +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b14b293d/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/Logger.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/Logger.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/Logger.scala new file mode 100644 index 0000000..06232c5 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/Logger.scala @@ -0,0 +1,7 @@ +package org.apache.s2graph.spark.sql.streaming + +import org.apache.commons.logging.{Log, LogFactory} + +trait Logger { + @transient lazy val logger: Log = LogFactory.getLog(this.getClass) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b14b293d/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2CommitProtocol.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2CommitProtocol.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2CommitProtocol.scala new file mode 100644 index 0000000..162b9ae --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2CommitProtocol.scala @@ -0,0 +1,48 @@ +package org.apache.s2graph.spark.sql.streaming + +import org.apache.spark.sql.execution.streaming.MetadataLog + +class S2CommitProtocol(@transient val commitLog: MetadataLog[Array[S2SinkStatus]]) + extends Serializable with Logger { + + def initJob(jobState: JobState): Unit = { + logger.debug(s"[InitJob] ${jobState}") + } + + def commitJob(jobState: JobState, taskCommits: Seq[TaskCommit]): Unit = { + val commits = taskCommits.flatMap(_.statuses).toArray[S2SinkStatus] + if (commitLog.add(jobState.batchId, commits)) { + logger.debug(s"[Committed batch] ${jobState.batchId} (${taskCommits})") + } else { + throw new IllegalStateException(s"Batch Id [${jobState.batchId}] is already committed") + } + } + + def abortJob(jobState: JobState): Unit = { + logger.info(s"[AbortJob] ${jobState}") + } + + @transient var executionStart: Long = _ + + def initTask(): Unit = { + executionStart = System.currentTimeMillis() + } + + def commitTask(taskState: TaskState): TaskCommit = { + TaskCommit(Some(S2SinkStatus(taskState.taskId, executionStart, taskState.successCnt, taskState.failCnt, taskState.counter))) + } + + def abortTask(taskState: TaskState): Unit = { + logger.info(s"[AbortTask] ${taskState}") + TaskCommit(None) + } +} + +case class JobState(jobId: String, batchId: Long, dataCnt: Long = -1L) +case class TaskState( + taskId: Int, + successCnt:Long = -1L, + failCnt:Long = -1L, + counter:Map[String, Int] = Map.empty + ) +case class TaskCommit(statuses: Option[S2SinkStatus]) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b14b293d/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala new file mode 100644 index 0000000..c24df5c --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala @@ -0,0 +1,28 @@ +package org.apache.s2graph.spark.sql.streaming + +import com.typesafe.config.Config + +import scala.util.Try + +object S2SinkConfigs { + val DB_DEFAULT_URL = "db.default.url" + val HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum" + + + val DEFAULT_GROUPED_SIZE = "100" + val DEFAULT_WAIT_TIME_SECONDS = "5" + + val S2_SINK_PREFIX = "s2.spark.sql.streaming.sink" + val S2_SINK_QUERY_NAME = s"$S2_SINK_PREFIX.queryname" + val S2_SINK_LOG_PATH = s"$S2_SINK_PREFIX.logpath" + val S2_SINK_CHECKPOINT_LOCATION = "checkpointlocation" + val S2_SINK_FILE_CLEANUP_DELAY = s"$S2_SINK_PREFIX.file.cleanup.delay" + val S2_SINK_DELETE_EXPIRED_LOG = s"$S2_SINK_PREFIX.delete.expired.log" + val S2_SINK_COMPACT_INTERVAL = s"$S2_SINK_PREFIX.compact.interval" + val S2_SINK_GROUPED_SIZE = s"$S2_SINK_PREFIX.grouped.size" + val S2_SINK_WAIT_TIME = s"$S2_SINK_PREFIX.wait.time" + + def getConfigStringOpt(config:Config, path:String): Option[String] = Try(config.getString(path)).toOption + + def getConfigString(config:Config, path:String, default:String): String = getConfigStringOpt(config, path).getOrElse(default) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b14b293d/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkContext.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkContext.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkContext.scala new file mode 100644 index 0000000..74b3308 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkContext.scala @@ -0,0 +1,25 @@ +package org.apache.s2graph.spark.sql.streaming + +import com.typesafe.config.Config +import org.apache.s2graph.core.S2Graph + +import scala.concurrent.ExecutionContext + +class S2SinkContext(config: Config)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global){ + println(s">>>> S2SinkContext Created...") + private lazy val s2Graph = new S2Graph(config) + def getGraph: S2Graph = { + s2Graph + } +} + +object S2SinkContext { + private var s2SinkContext:S2SinkContext = null + + def apply(config:Config):S2SinkContext = { + if (s2SinkContext == null) { + s2SinkContext = new S2SinkContext(config) + } + s2SinkContext + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b14b293d/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkMetadataLog.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkMetadataLog.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkMetadataLog.scala new file mode 100644 index 0000000..eb985f1 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkMetadataLog.scala @@ -0,0 +1,25 @@ +package org.apache.s2graph.spark.sql.streaming + +import com.typesafe.config.Config +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog + +class S2SinkMetadataLog(sparkSession: SparkSession, config:Config, logPath:String) + extends CompactibleFileStreamLog[S2SinkStatus](S2SinkMetadataLog.VERSION, sparkSession, logPath) { + import S2SinkConfigs._ + + override protected def fileCleanupDelayMs: Long = getConfigStringOpt(config, S2_SINK_FILE_CLEANUP_DELAY) + .getOrElse("60").toLong + + override protected def isDeletingExpiredLog: Boolean = getConfigStringOpt(config, S2_SINK_DELETE_EXPIRED_LOG) + .getOrElse("false").toBoolean + + override protected def defaultCompactInterval: Int = getConfigStringOpt(config, S2_SINK_COMPACT_INTERVAL) + .getOrElse("60").toInt + + override def compactLogs(logs: Seq[S2SinkStatus]): Seq[S2SinkStatus] = logs +} + +object S2SinkMetadataLog { + private val VERSION = 1 +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b14b293d/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkProvider.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkProvider.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkProvider.scala new file mode 100644 index 0000000..6198c81 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkProvider.scala @@ -0,0 +1,26 @@ +package org.apache.s2graph.spark.sql.streaming + +import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions} +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.execution.streaming.Sink +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} +import org.apache.spark.sql.streaming.OutputMode + +import scala.collection.JavaConversions._ + +class S2SinkProvider extends StreamSinkProvider with DataSourceRegister with Logger { + override def createSink( + sqlContext: SQLContext, + parameters: Map[String, String], + partitionColumns: Seq[String], + outputMode: OutputMode): Sink = { + + logger.info(s"S2SinkProvider options : ${parameters}") + val jobConf:Config = ConfigFactory.parseMap(parameters).withFallback(ConfigFactory.load()) + logger.info(s"S2SinkProvider Configuration : ${jobConf.root().render(ConfigRenderOptions.concise())}") + + new S2SparkSqlStreamingSink(sqlContext.sparkSession, jobConf) + } + + override def shortName(): String = "s2graph" +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b14b293d/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkStatus.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkStatus.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkStatus.scala new file mode 100644 index 0000000..9b5ab8e --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkStatus.scala @@ -0,0 +1,9 @@ +package org.apache.s2graph.spark.sql.streaming + +case class S2SinkStatus( + taskId: Int, + execTimeMillis: Long, + successCnt:Long, + failCnt:Long, + counter:Map[String, Int] + ) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b14b293d/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SparkSqlStreamingSink.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SparkSqlStreamingSink.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SparkSqlStreamingSink.scala new file mode 100644 index 0000000..e705a7a --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SparkSqlStreamingSink.scala @@ -0,0 +1,71 @@ +package org.apache.s2graph.spark.sql.streaming + +import java.util.UUID + +import com.typesafe.config.{Config, ConfigRenderOptions} +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.streaming.{MetadataLog, Sink} +import org.apache.spark.sql.{DataFrame, SparkSession} + +class S2SparkSqlStreamingSink( + sparkSession: SparkSession, + config:Config + ) extends Sink with Logger { + import S2SinkConfigs._ + + private val APP_NAME = "s2graph" + + private val writeLog: MetadataLog[Array[S2SinkStatus]] = { + val logPath = getCommitLogPath(config) + logger.info(s"MetaDataLogPath: $logPath") + + new S2SinkMetadataLog(sparkSession, config, logPath) + } + + override def addBatch(batchId: Long, data: DataFrame): Unit = { + val dataCnt = data.count() + logger.debug(s"addBatch : $batchId, getLatest : ${writeLog.getLatest()}, data cnt : ${dataCnt}") + + if (batchId <= writeLog.getLatest().map(_._1).getOrElse(-1L)) { + logger.info(s"Skipping already committed batch [$batchId]") + } else { + val queryName = getConfigStringOpt(config, "queryname").getOrElse(UUID.randomUUID().toString) + val commitProtocol = new S2CommitProtocol(writeLog) + val jobState = JobState(queryName, batchId, dataCnt) + val serializedConfig = config.root().render(ConfigRenderOptions.concise()) + val queryExecution = data.queryExecution + val schema = data.schema + + SQLExecution.withNewExecutionId(sparkSession, queryExecution) { + try { + val taskCommits = sparkSession.sparkContext.runJob(queryExecution.toRdd, + (taskContext: TaskContext, iter: Iterator[InternalRow]) => { + new S2StreamQueryWriter(serializedConfig, schema, commitProtocol).run(taskContext, iter) + } + ) + commitProtocol.commitJob(jobState, taskCommits) + } catch { + case t: Throwable => + commitProtocol.abortJob(jobState) + throw t; + } + + } + } + } + + private def getCommitLogPath(config:Config): String = { + val logPathOpt = getConfigStringOpt(config, S2_SINK_LOG_PATH) + val userCheckpointLocationOpt = getConfigStringOpt(config, S2_SINK_CHECKPOINT_LOCATION) + + (logPathOpt, userCheckpointLocationOpt) match { + case (Some(logPath), _) => logPath + case (None, Some(userCheckpoint)) => s"$userCheckpoint/sinks/$APP_NAME" + case _ => throw new IllegalArgumentException(s"failed to get commit log path") + } + } + + override def toString(): String = "S2GraphSink" +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b14b293d/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala new file mode 100644 index 0000000..de5aea0 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala @@ -0,0 +1,114 @@ +package org.apache.s2graph.spark.sql.streaming + +import com.typesafe.config.ConfigFactory +import org.apache.s2graph.core.{GraphElement, JSONParser} +import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._ +import org.apache.spark.TaskContext +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} +import org.apache.spark.sql.types.StructType +import play.api.libs.json.{JsObject, Json} + +import scala.collection.mutable.ListBuffer +import scala.concurrent.Await +import scala.concurrent.duration.Duration +import scala.util.Try + +private [sql] class S2StreamQueryWriter( + serializedConf:String, + schema: StructType , + commitProtocol: S2CommitProtocol + ) extends Serializable with Logger { + private val config = ConfigFactory.parseString(serializedConf) + private val s2SinkContext = S2SinkContext(config) + private val encoder: ExpressionEncoder[Row] = RowEncoder(schema).resolveAndBind() + private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", "operation", "elem", "direction") + + + def run(taskContext: TaskContext, iters: Iterator[InternalRow]): TaskCommit = { + val taskId = s"stage-${taskContext.stageId()}, partition-${taskContext.partitionId()}, attempt-${taskContext.taskAttemptId()}" + val partitionId= taskContext.partitionId() + + val s2Graph = s2SinkContext.getGraph + val groupedSize = getConfigString(config, S2_SINK_GROUPED_SIZE, DEFAULT_GROUPED_SIZE).toInt + val waitTime = getConfigString(config, S2_SINK_WAIT_TIME, DEFAULT_WAIT_TIME_SECONDS).toInt + + commitProtocol.initTask() + try { + var list = new ListBuffer[(String, Int)]() + val rst = iters.flatMap(rowToEdge).grouped(groupedSize).flatMap{ elements => + logger.debug(s"[$taskId][elements] ${elements.size} (${elements.map(e => e.toLogString).mkString(",\n")})") + elements.groupBy(_.serviceName).foreach{ case (service, elems) => + list += ((service, elems.size)) + } + + val mutateF = s2Graph.mutateElements(elements, true) + Await.result(mutateF, Duration(waitTime, "seconds")) + } + + val (success, fail) = rst.toSeq.partition(r => r.isSuccess) + val counter = list.groupBy(_._1).map{ case (service, t) => + val sum = t.toList.map(_._2).sum + (service, sum) + } + logger.info(s"[$taskId] success : ${success.size}, fail : ${fail.size} ($counter)") + + + commitProtocol.commitTask(TaskState(partitionId, success.size, fail.size, counter)) + + } catch { + case t: Throwable => + commitProtocol.abortTask(TaskState(partitionId)) + throw t + } + } + + private def rowToEdge(internalRow:InternalRow): Option[GraphElement] = { + val s2Graph = s2SinkContext.getGraph + val row = encoder.fromRow(internalRow) + + val timestamp = row.getAs[Long]("timestamp") + val operation = Try(row.getAs[String]("operation")).getOrElse("insert") + val elem = Try(row.getAs[String]("elem")).getOrElse("e") + + val props: Map[String, Any] = Option(row.getAs[String]("props")) match { + case Some(propsStr:String) => + JSONParser.fromJsonToProperties(Json.parse(propsStr).as[JsObject]) + case None => + schema.fieldNames.flatMap { field => + if (!RESERVED_COLUMN.contains(field)) { + Seq( + field -> getRowValAny(row, field) + ) + } else Nil + }.toMap + } + + elem match { + case "e" | "edge" => + val from = getRowValAny(row, "from") + val to = getRowValAny(row, "to") + val label = row.getAs[String]("label") + val direction = Try(row.getAs[String]("direction")).getOrElse("out") + Some( + s2Graph.elementBuilder.toEdge(from, to, label, direction, props, timestamp, operation) + ) + case "v" | "vertex" => + val id = getRowValAny(row, "id") + val serviceName = row.getAs[String]("service") + val columnName = row.getAs[String]("column") + Some( + s2Graph.elementBuilder.toVertex(serviceName, columnName, id, props, timestamp, operation) + ) + case _ => + logger.warn(s"'$elem' is not GraphElement. skipped!! (${row.toString()})") + None + } + } + + private def getRowValAny(row:Row, fieldName:String):Any = { + val idx = row.fieldIndex(fieldName) + row.get(idx) + } +}
