Repository: carbondata Updated Branches: refs/heads/master e58ca9f0c -> d5bec4dd7
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/Profiler.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/Profiler.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/Profiler.scala new file mode 100644 index 0000000..e15458d --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/Profiler.scala @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.profiler + +import java.util + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkContext, SparkEnv} +import org.apache.spark.rpc.RpcEndpoint +import org.apache.spark.util._ + +import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability} +import org.apache.carbondata.core.util.CarbonProperties + +/** + * profiler end point util object + */ +@InterfaceAudience.Developer(Array("profiler")) +@InterfaceStability.Evolving +object Profiler { + // whether it is enable or not + private var isEnable = CarbonProperties.getInstance().isEnableQueryStatistics + + private var notInitialized = true + + // map statementId -> ProfilerMessage[] + private lazy val statementMap = new util.HashMap[Long, ArrayBuffer[ProfilerMessage]]() + + // map executionId -> ProfilerMessage[] + private lazy val executionMap = new util.HashMap[Long, ArrayBuffer[ProfilerMessage]]() + + private val endpointName = "CarbonProfiler" + + // setup EndpointRef to driver + private lazy val setupEndpointRef = + RpcUtils.makeDriverRef(endpointName, SparkEnv.get.conf, SparkEnv.get.rpcEnv) + + /** + * setup profiler end point and register CarbonProfilerListener + */ + def initialize(sparkContext: SparkContext): Unit = this.synchronized { + invokeIfEnable { + if (notInitialized) { + notInitialized = false + SparkEnv.get.rpcEnv.setupEndpoint(endpointName, new ProfilerEndPoint()) + sparkContext.addSparkListener(new ProfilerListener) + } + } + } + + /** + * run body if ProfilerEndPoint is enabled + */ + def invokeIfEnable(body: => Unit): Unit = { + if (isEnable) { + body + } + } + + /** + * send message to driver + */ + def send(message: ProfilerMessage): Unit = { + Profiler.setupEndpointRef.send(message) + } + + /** + * add message to statementMap + */ + def addStatementMessage(statementId: Long, message: ProfilerMessage): Unit = this.synchronized { + val profilerMessages = statementMap.get(statementId) + if (profilerMessages == null) { + statementMap.put(statementId, ArrayBuffer[ProfilerMessage](message)) + } else { + profilerMessages += message + } + } + + /** + * remove all messages of a statement by id + */ + def removeStatementMessage(statementId: Long): ArrayBuffer[ProfilerMessage] = { + statementMap.remove(statementId) + } + + /** + * add message to executionMap + */ + def addExecutionMessage(executionId: Long, message: ProfilerMessage): Unit = this.synchronized { + val profilerMessages = executionMap.get(executionId) + if (profilerMessages == null) { + executionMap.put(executionId, ArrayBuffer[ProfilerMessage](message)) + } else { + profilerMessages += message + } + } + + /** + * remove all messages of a execution by id + */ + def removeExecutionMessage(executionId: Long): ArrayBuffer[ProfilerMessage] = { + executionMap.remove(executionId) + } + + def setIsEnable(isEnable: Boolean): Unit = { + Profiler.isEnable = isEnable + } +} + +class ProfilerEndPoint extends RpcEndpoint { + override val rpcEnv = SparkEnv.get.rpcEnv + + def processSQLStart(statementId: Long, messages: ArrayBuffer[ProfilerMessage]): Unit = { + ProfilerLogger.logStatementSummary(statementId, messages) + } + + def processExecutionEnd(executionId: Long, messages: ArrayBuffer[ProfilerMessage]): Unit = { + ProfilerLogger.logExecutionSummary(executionId, messages) + } + + override def receive: PartialFunction[Any, Unit] = { + case sqlStart: SQLStart => + if (sqlStart.isCommand) { + // for the command sql, print summary to log file + var messages = Profiler.removeStatementMessage(sqlStart.statementId) + if (messages != null) { + messages += sqlStart + } else { + messages = ArrayBuffer[ProfilerMessage](sqlStart) + } + processSQLStart(sqlStart.statementId, messages) + } + case optimizer: Optimizer => + val messages = Profiler.removeStatementMessage(optimizer.statementId) + if (messages == null) { + // the statement is a command, just add it to statementMap + Profiler.addStatementMessage(optimizer.statementId, optimizer) + } else { + // this statement is a select query, print summary to log file + messages += optimizer + processSQLStart(optimizer.statementId, messages) + } + case getPartition: GetPartition => + Profiler.addExecutionMessage(getPartition.executionId, getPartition) + case task: QueryTaskEnd => + Profiler.addExecutionMessage(task.executionId, task) + case executionEnd: ExecutionEnd => + // print execution summary to log file + val messages = Profiler.removeExecutionMessage(executionEnd.executionId) + if (messages != null) { + messages += executionEnd + processExecutionEnd(executionEnd.executionId, messages) + } + } +} + +/** + * the trait of profiler messages + */ +trait ProfilerMessage + +case class SQLStart( + sqlText: String, + statementId: Long, + var startTime: Long = -1, + var parseEnd: Long = -1, + var analyzerEnd: Long = -1, + var endTime: Long = -1, + var isCommand: Boolean = false +) extends ProfilerMessage + +case class Optimizer( + statementId: Long, + startTime: Long, + timeTaken: Long +) extends ProfilerMessage + +case class ExecutionStart( + executionId: Long, + startTime: Long, + plan: String +) extends ProfilerMessage + +case class ExecutionEnd( + executionId: Long, + endTime: Long +) extends ProfilerMessage + +case class GetPartition( + executionId: Long, + tableName: String, + tablePath: String, + queryId: String, + numOfPartitions: Int, + startTime: Long, + endTime: Long, + getSplitsStart: Long, + getSplitsEnd: Long, + numSegments: Int, + numStreamSegments: Int, + numBlocks: Int, + distributeStart: Long, + distributeEnd: Long, + filter: String, + projection: String +) extends ProfilerMessage with Comparable[GetPartition] { + override def compareTo(other: GetPartition): Int = { + queryId.compareTo(other.queryId) + } +} + +case class QueryTaskEnd( + executionId: Long, + queryId: String, + values: Array[Long], + size: Long, + files: Array[String] +) extends ProfilerMessage with Comparable[QueryTaskEnd] { + override def compareTo(other: QueryTaskEnd): Int = { + val result = this.queryId.compareTo(other.queryId) + if (result != 0) { + result + } else { + val task = this.values(1) - other.values(1) + if (task > 0) { + 1 + } else if (task < 0) { + -1 + } else { + 0 + } + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/ProfilerListener.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/ProfilerListener.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/ProfilerListener.scala new file mode 100644 index 0000000..24ff33f --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/ProfilerListener.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.profiler + +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerTaskEnd, SparkListenerTaskGettingResult, SparkListenerTaskStart} +import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} + +/** + * listen sql execution + */ +private[profiler] class ProfilerListener extends SparkListener { + override def onOtherEvent(event: SparkListenerEvent): Unit = { + Profiler.invokeIfEnable { + event match { + case executionStart: SparkListenerSQLExecutionStart => + Profiler.addExecutionMessage( + executionStart.executionId, + ExecutionStart( + executionStart.executionId, + executionStart.time, + executionStart.physicalPlanDescription + )) + case executionEnd: SparkListenerSQLExecutionEnd => + Profiler.send( + ExecutionEnd( + executionEnd.executionId, + executionEnd.time + ) + ) + case _ => + } + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/ProfilerLogger.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/ProfilerLogger.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/ProfilerLogger.scala new file mode 100644 index 0000000..d474090 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/ProfilerLogger.scala @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.profiler + +import java.lang.{StringBuilder => JavaStringBuilder} +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util +import java.util.Comparator + +import scala.collection.mutable.ArrayBuffer + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.stats.TaskStatistics +import org.apache.carbondata.core.util.CarbonUtil + +/** + * the util of profiler logger + */ +private[profiler] object ProfilerLogger { + + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + lazy val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") + + // format timestamp value + def format(time: Long): String = this.synchronized { + if (time < 0) { + "" + } else { + val timestamp = new Timestamp(time) + simpleDateFormat.format(timestamp) + } + } + + def logStatementSummary(statementId: Long, messages: ArrayBuffer[ProfilerMessage]): Unit = { + LOGGER.info(new StatementSummary(statementId, messages).toString) + } + + def logExecutionSummary(executionId: Long, messages: ArrayBuffer[ProfilerMessage]): Unit = { + LOGGER.info(new ExecutionSummary(executionId, messages).toString) + } +} + +/** + * summarize the statement information + */ +private[profiler] class StatementSummary( + statementId: Long, + messages: ArrayBuffer[ProfilerMessage] +) { + + private var sqlText: String = "" + + private var isCommand = false + + private var startTime: Long = -1 + + private var parserEnd: Long = -1 + + private var analyzerEnd: Long = -1 + + private var optimizerStart: Long = -1 + + private var optimizerEnd: Long = -1 + + private var optimizerTaken: Long = -1 + + private var endTime: Long = -1 + + // summarize the messages + messages.foreach { + case sqlStart: SQLStart => + sqlText = sqlStart.sqlText.trim + isCommand = sqlStart.isCommand + startTime = sqlStart.startTime + parserEnd = sqlStart.parseEnd + analyzerEnd = sqlStart.analyzerEnd + endTime = sqlStart.endTime + case optimizer: Optimizer => + optimizerTaken = optimizer.timeTaken + optimizerStart = optimizer.startTime + optimizerEnd = optimizerStart + optimizerTaken + } + + private def totalTaken: Long = endTime - startTime + + private def parserTaken: Long = parserEnd - startTime + + private def analyzerTaken: Long = analyzerEnd - parserEnd + + private def parserToOptimizer: Long = optimizerEnd - startTime + + private def commandTaken: Long = endTime - analyzerEnd + + override def toString: String = { + if (isCommand) { + buildForComand() + } else { + buildForQuery() + } + } + + /** + * for example + * [statement id]: 1 + * [sql text]: + * +-----------------------------------------------------------+ + * |CREATE TABLE carbon_table( | + * | shortField SHORT, | + * | intField INT, | + * | bigintField LONG, | + * | doubleField DOUBLE, | + * | stringField STRING, | + * | timestampField TIMESTAMP, | + * | decimalField DECIMAL(18,2), | + * | dateField DATE, | + * | charField CHAR(5), | + * | floatField FLOAT | + * | ) | + * | STORED BY 'carbondata' | + * | TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField')| + * +-----------------------------------------------------------+ + * [start time]: 2018-03-22 17:12:18.310 + * [driver side total taken]: 1255 ms + * |__ 1.parser taken: 203 ms + * |__ 2.analyzer taken: 1 ms + * |__ 3.execution taken: 1051 ms + * |__ ... + */ + private def buildForComand(): String = { + val builder = new JavaStringBuilder(1000) + builder.append(s"\n[statement id]: ${ statementId }") + builder.append(s"\n[sql text]:\n") + CarbonUtil.logTable(builder, sqlText, "") + builder.append(s"\n[start time]: ${ ProfilerLogger.format(startTime) }\n") + builder.append(s"[driver side total taken]: $totalTaken ms\n") + builder.append(s" |__ 1.parser taken: $parserTaken ms\n") + builder.append(s" |__ 2.analyzer taken: $analyzerTaken ms\n") + builder.append(s" |__ 3.execution taken: $commandTaken ms") + builder.toString + } + + /** + * for example + * [statement id]: 4 + * [sql text]: + * +--------------------------------------------------+ + * |SELECT charField, stringField, intField | + * | FROM carbon_table | + * | WHERE stringfield = 'spark' AND decimalField > 40| + * +--------------------------------------------------+ + * [start time]: 2018-03-22 17:12:22.001 + * [driver side total taken]: 476ms + * |__ 1.parser taken: 82 ms + * |__ 2.analyzer taken: 195 ms + * |__ 3.(149ms) + * |__ 4.carbon optimizer taken: 50 ms + * end time: 2018-03-22 17:12:22.477 + */ + private def buildForQuery(): String = { + val builder = new JavaStringBuilder(1000) + builder.append(s"\n[statement id]: ${ statementId }") + builder.append(s"\n[sql text]:\n") + CarbonUtil.logTable(builder, sqlText, "") + builder.append(s"\n[start time]: ${ ProfilerLogger.format(startTime) }\n") + builder.append(s"[driver side total taken]: ${ parserToOptimizer }ms\n") + builder.append(s" |__ 1.parser taken: $parserTaken ms\n") + builder.append(s" |__ 2.analyzer taken: $analyzerTaken ms\n") + builder.append(s" |__ 3.(${ optimizerStart - analyzerEnd }ms)\n") + builder.append(s" |__ 4.carbon optimizer taken: $optimizerTaken ms\n") + builder.append(s" end time: ${ ProfilerLogger.format(optimizerEnd) }") + builder.toString + } +} + +/** + * summarize the execution information + */ +private[profiler] class ExecutionSummary( + executionId: Long, + messages: ArrayBuffer[ProfilerMessage] +) { + + private var sqlPlan: String = "" + + private var startTime: Long = -1 + + private var endTime: Long = -1 + + private val partitions = new util.ArrayList[GetPartition]() + + private val tasks = new util.ArrayList[TaskStatistics]() + + // summarize the messages + messages.foreach { + case start: ExecutionStart => + sqlPlan = start.plan + startTime = start.startTime + case end: ExecutionEnd => + endTime = end.endTime + case partition: GetPartition => + partitions.add(partition) + case task: QueryTaskEnd => + tasks.add(new TaskStatistics(task.queryId, task.values, task.size, task.files)) + } + + private def totalTaken: Long = endTime - startTime + + /** + * for example + * (prepare inputFormat 55ms)~(getSplits 90ms)~(distributeSplits 4ms)~(3ms) + */ + private def detail(p: GetPartition, builder: JavaStringBuilder): Unit = { + builder.append("(prepare inputFormat ").append(p.getSplitsStart - p.startTime).append("ms)~") + builder.append("(getSplits ").append(p.getSplitsEnd - p.getSplitsStart).append("ms)~") + val gap1 = p.distributeStart - p.getSplitsEnd + if (gap1 > 0) { + builder.append("(").append(gap1).append("ms)~") + } + builder.append("(distributeSplits ").append(p.distributeEnd - p.distributeStart).append("ms)") + val gap2 = p.endTime - p.distributeEnd + if (gap2 > 0) { + builder.append("~(").append(gap2).append("ms)") + } + } + + private def printGetPartitionTable(builder: JavaStringBuilder, indent: String): Unit = { + util.Collections.sort(partitions) + for (rowIndex <- 0 until partitions.size()) { + val partition = partitions.get(rowIndex) + val content = new JavaStringBuilder() + content.append("query_id: ").append(partition.queryId).append("\n") + .append("table_name: ").append(partition.tableName).append("\n") + .append("table_path: ").append(partition.tablePath).append("\n") + .append("start_time: ").append(ProfilerLogger.format(partition.startTime)).append("\n") + // total time + content.append("total_time: ").append(partition.endTime - partition.startTime).append("ms") + content.append(" [") + detail(partition, content) + content.append("]\n") + content.append("valid_segment_num: ").append(partition.numSegments).append("\n") + .append("stream_segment_num: ").append(partition.numStreamSegments).append("\n") + .append("hit_data_block: ").append(partition.numBlocks).append("\n") + .append("spark_partition_num: ").append(partition.numOfPartitions).append("\n") + .append("pushed_filter: ").append(partition.filter).append("\n") + .append("pushed_projection: ").append(partition.projection) + if (rowIndex > 0) { + builder.append("\n") + } + CarbonUtil.logTable(builder, content.toString, indent) + } + } + + private def printStatisticTable(builder: JavaStringBuilder, indent: String): Unit = { + util.Collections.sort(tasks, comparator) + TaskStatistics.printStatisticTable(tasks, builder, indent) + } + + private def printInputFiles(builder: JavaStringBuilder, indent: String): Unit = { + for (taskIndex <- 0 until tasks.size()) { + val task = tasks.get(taskIndex) + val content = new JavaStringBuilder() + content.append("query_id: ").append(task.getQueryId).append("\n") + .append("task_id: ").append(task.getValues()(1)).append("\n") + .append("total_size: ").append(task.getFileSize).append("Byte\n") + .append("file_list: ") + task.getFiles.foreach { file => + content.append("\n Segment_").append(file) + } + if (taskIndex > 0) { + builder.append("\n") + } + CarbonUtil.logTable(builder, content.toString, indent) + } + } + + // use to sort tasks + private lazy val comparator = new Comparator[TaskStatistics]() { + override def compare(o1: TaskStatistics, + o2: TaskStatistics) = { + val result = o1.getQueryId.compareTo(o2.getQueryId()) + if (result != 0) { + result + } else { + val task = o1.getValues()(1) - o2.getValues()(1) + if (task > 0) { + 1 + } else if (task < 0) { + -1 + } else { + 0 + } + } + } + } + + // scalastyle:off + /** + * for example + * [execution id]: 0 + * [start time]: 2018-03-22 17:12:22.608 + * [executor side total taken]: 845 ms + * |_1.getPartition + * +------------------------------------------------------------------------------------------------------------------+ + * |query_id: 23737310772188 | + * |table_name: default.carbon_table | + * |table_path: /carbondata/examples/spark2/target/store/default/carbon_table | + * |start_time: 2018-03-22 17:12:23.141 | + * |total_time: 152ms [(prepare inputFormat 55ms)~(getSplits 90ms)~(distributeSplits 4ms)~(3ms)] | + * |valid_segment_num: 1 | + * |stream_segment_num: 1 | + * |hit_data_block: 1 | + * |spark_partition_num: 1 | + * |pushed_filter: (((stringfield <> null and decimalfield <> null) and stringfield = spark) and decimalfield > 40.00)| + * |pushed_projection: charfield,stringfield,intfield | + * +------------------------------------------------------------------------------------------------------------------+ + * |_2.task statistics + * +--------------+-------+-----------------------+----------+----------------+--------------------+----------------+--------------+---------------+---------------+---------------+-----------+-------------+-----------+-----------+ + * |query_id |task_id|start_time |total_time|load_blocks_time|load_dictionary_time|carbon_scan_time|carbon_IO_time|scan_blocks_num|total_blocklets|valid_blocklets|total_pages|scanned_pages|valid_pages|result_size| + * +--------------+-------+-----------------------+----------+----------------+--------------------+----------------+--------------+---------------+---------------+---------------+-----------+-------------+-----------+-----------+ + * |23737310772188| 0|2018-03-22 17:12:23.334| 106ms| 3ms| 0ms| -1ms| -1ms| 1| 1| 1| 0| 1| 1| 3| + * +--------------+-------+-----------------------+----------+----------------+--------------------+----------------+--------------+---------------+---------------+---------------+-----------+-------------+-----------+-----------+ + * |_3.input files for each task + * +--------------------------------------------------------+ + * |query_id: 23737310772188 | + * |task_id: 0 | + * |total_size: 2717Byte | + * |file_list: | + * | Segment_0/part-0-0_batchno0-0-1521709939969.carbondata| + * +--------------------------------------------------------+ + * [sql plan]: + * +----------+ + * | ... | + * +----------+ + */ + // scalastyle:on + override def toString: String = { + val builder = new JavaStringBuilder(1000) + builder.append(s"\n[execution id]: ${ executionId }\n") + builder.append(s"[start time]: ${ ProfilerLogger.format(startTime) }\n") + builder.append(s"[executor side total taken]: $totalTaken ms") + builder.append(s"\n |_1.getPartition\n") + printGetPartitionTable(builder, " ") + builder.append(s"\n |_2.task statistics\n") + printStatisticTable(builder, " ") + builder.append(s"\n |_3.input files for each task\n") + printInputFiles(builder, " ") + builder.append(s"\n[sql plan]:\n") + CarbonUtil.logTable(builder, sqlPlan, "") + builder.toString + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index bf958f8..f6bdff6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql import java.io.File +import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConverters._ @@ -24,9 +25,12 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.SparkSession.Builder +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.plans.logical.{Command, LocalRelation, Union} import org.apache.spark.sql.execution.streaming.CarbonStreamingQueryListener import org.apache.spark.sql.hive.execution.command.CarbonSetCommand import org.apache.spark.sql.internal.{SessionState, SharedState} +import org.apache.spark.sql.profiler.{Profiler, SQLStart} import org.apache.spark.util.{CarbonReflectionUtils, Utils} import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -39,7 +43,8 @@ import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil * User needs to use {CarbonSession.getOrCreateCarbon} to create Carbon session. */ class CarbonSession(@transient val sc: SparkContext, - @transient private val existingSharedState: Option[SharedState]) extends SparkSession(sc) { + @transient private val existingSharedState: Option[SharedState] +) extends SparkSession(sc) { self => def this(sc: SparkContext) { this(sc, None) @@ -72,10 +77,47 @@ class CarbonSession(@transient val sc: SparkContext, new CarbonSession(sparkContext, Some(sharedState)) } + override def sql(sqlText: String): DataFrame = { + val sse = SQLStart(sqlText, CarbonSession.statementId.getAndIncrement()) + CarbonSession.threadStatementId.set(sse.statementId) + sse.startTime = System.currentTimeMillis() + + try { + val logicalPlan = sessionState.sqlParser.parsePlan(sqlText) + sse.parseEnd = System.currentTimeMillis() + + val qe = sessionState.executePlan(logicalPlan) + qe.assertAnalyzed() + sse.isCommand = qe.analyzed match { + case c: Command => + true + case u @ Union(children) if children.forall(_.isInstanceOf[Command]) => + true + case _ => + false + } + sse.analyzerEnd = System.currentTimeMillis() + + new Dataset[Row](self, qe, RowEncoder(qe.analyzed.schema)) + } finally { + Profiler.invokeIfEnable { + if (sse.isCommand) { + sse.endTime = System.currentTimeMillis() + Profiler.send(sse) + } else { + Profiler.addStatementMessage(sse.statementId, sse) + } + } + } + } } object CarbonSession { + private val statementId = new AtomicLong(0) + + private[sql] val threadStatementId = new ThreadLocal[Long]() + implicit class CarbonBuilder(builder: Builder) { def getOrCreateCarbonSession(): SparkSession = { @@ -174,6 +216,8 @@ object CarbonSession { } options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } SparkSession.setDefaultSession(session) + // Setup monitor end point and register CarbonMonitorListener + Profiler.initialize(sparkContext) // Register a successfully instantiated context to the singleton. This should be at the // end of the class definition so that the singleton is updated only if there is no // exception in the construction of the instance. http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala index b7df9b4..aa01c38 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala @@ -73,6 +73,10 @@ class SparkUnknownExpression( sparkExp.toString() } + override def getStatement: String = { + sparkExp.toString() + } + def setEvaluateExpression(evaluateExpression: (InternalRow) => Any): Unit = { this.evaluateExpression = evaluateExpression isExecutor = true http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala index 0aa7514..e99f502 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala @@ -28,8 +28,8 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.RunnableCommand -import org.apache.spark.sql.execution.command.mutation.CarbonProjectForUpdateCommand import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.profiler.{Optimizer, Profiler} import org.apache.spark.sql.types.{IntegerType, StringType} import org.apache.carbondata.common.logging.LogServiceFactory @@ -58,6 +58,15 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { System.currentTimeMillis) recorder.recordStatistics(queryStatistic) recorder.logStatistics() + Profiler.invokeIfEnable { + Profiler.send( + Optimizer( + CarbonSession.threadStatementId.get(), + queryStatistic.getStartTime, + queryStatistic.getTimeTaken + ) + ) + } result } else { LOGGER.info("Skip CarbonOptimizer")