Repository: carbondata
Updated Branches:
  refs/heads/master e58ca9f0c -> d5bec4dd7


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/Profiler.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/Profiler.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/Profiler.scala
new file mode 100644
index 0000000..e15458d
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/Profiler.scala
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.profiler
+
+import java.util
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.rpc.RpcEndpoint
+import org.apache.spark.util._
+
+import org.apache.carbondata.common.annotations.{InterfaceAudience, 
InterfaceStability}
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * profiler end point util object
+ */
+@InterfaceAudience.Developer(Array("profiler"))
+@InterfaceStability.Evolving
+object Profiler {
+  // whether it is enable or not
+  private var isEnable = CarbonProperties.getInstance().isEnableQueryStatistics
+
+  private var notInitialized = true
+
+  // map statementId -> ProfilerMessage[]
+  private lazy val statementMap = new util.HashMap[Long, 
ArrayBuffer[ProfilerMessage]]()
+
+  // map executionId -> ProfilerMessage[]
+  private lazy val executionMap = new util.HashMap[Long, 
ArrayBuffer[ProfilerMessage]]()
+
+  private val endpointName = "CarbonProfiler"
+
+  // setup EndpointRef to driver
+  private lazy val setupEndpointRef =
+    RpcUtils.makeDriverRef(endpointName, SparkEnv.get.conf, 
SparkEnv.get.rpcEnv)
+
+  /**
+   * setup profiler end point and register CarbonProfilerListener
+   */
+  def initialize(sparkContext: SparkContext): Unit = this.synchronized {
+    invokeIfEnable {
+      if (notInitialized) {
+        notInitialized = false
+        SparkEnv.get.rpcEnv.setupEndpoint(endpointName, new ProfilerEndPoint())
+        sparkContext.addSparkListener(new ProfilerListener)
+      }
+    }
+  }
+
+  /**
+   * run body if ProfilerEndPoint is enabled
+   */
+  def invokeIfEnable(body: => Unit): Unit = {
+    if (isEnable) {
+      body
+    }
+  }
+
+  /**
+   * send message to driver
+   */
+  def send(message: ProfilerMessage): Unit = {
+    Profiler.setupEndpointRef.send(message)
+  }
+
+  /**
+   * add message to statementMap
+   */
+  def addStatementMessage(statementId: Long, message: ProfilerMessage): Unit = 
this.synchronized {
+    val profilerMessages = statementMap.get(statementId)
+    if (profilerMessages == null) {
+      statementMap.put(statementId, ArrayBuffer[ProfilerMessage](message))
+    } else {
+      profilerMessages += message
+    }
+  }
+
+  /**
+   * remove all messages of a statement by id
+   */
+  def removeStatementMessage(statementId: Long): ArrayBuffer[ProfilerMessage] 
= {
+    statementMap.remove(statementId)
+  }
+
+  /**
+   * add message to executionMap
+   */
+  def addExecutionMessage(executionId: Long, message: ProfilerMessage): Unit = 
this.synchronized {
+    val profilerMessages = executionMap.get(executionId)
+    if (profilerMessages == null) {
+      executionMap.put(executionId, ArrayBuffer[ProfilerMessage](message))
+    } else {
+      profilerMessages += message
+    }
+  }
+
+  /**
+   * remove all messages of a execution by id
+   */
+  def removeExecutionMessage(executionId: Long): ArrayBuffer[ProfilerMessage] 
= {
+    executionMap.remove(executionId)
+  }
+
+  def setIsEnable(isEnable: Boolean): Unit = {
+    Profiler.isEnable = isEnable
+  }
+}
+
+class ProfilerEndPoint extends RpcEndpoint {
+  override val rpcEnv = SparkEnv.get.rpcEnv
+
+  def processSQLStart(statementId: Long, messages: 
ArrayBuffer[ProfilerMessage]): Unit = {
+    ProfilerLogger.logStatementSummary(statementId, messages)
+  }
+
+  def processExecutionEnd(executionId: Long, messages: 
ArrayBuffer[ProfilerMessage]): Unit = {
+    ProfilerLogger.logExecutionSummary(executionId, messages)
+  }
+
+  override def receive: PartialFunction[Any, Unit] = {
+    case sqlStart: SQLStart =>
+      if (sqlStart.isCommand) {
+        // for the command sql, print summary to log file
+        var messages = Profiler.removeStatementMessage(sqlStart.statementId)
+        if (messages != null) {
+          messages += sqlStart
+        } else {
+          messages = ArrayBuffer[ProfilerMessage](sqlStart)
+        }
+        processSQLStart(sqlStart.statementId, messages)
+      }
+    case optimizer: Optimizer =>
+      val messages = Profiler.removeStatementMessage(optimizer.statementId)
+      if (messages == null) {
+        // the statement is a command, just add it to statementMap
+        Profiler.addStatementMessage(optimizer.statementId, optimizer)
+      } else {
+        // this statement is a select query, print summary to log file
+        messages += optimizer
+        processSQLStart(optimizer.statementId, messages)
+      }
+    case getPartition: GetPartition =>
+      Profiler.addExecutionMessage(getPartition.executionId, getPartition)
+    case task: QueryTaskEnd =>
+      Profiler.addExecutionMessage(task.executionId, task)
+    case executionEnd: ExecutionEnd =>
+      // print execution summary to log file
+      val messages = Profiler.removeExecutionMessage(executionEnd.executionId)
+      if (messages != null) {
+        messages += executionEnd
+        processExecutionEnd(executionEnd.executionId, messages)
+      }
+  }
+}
+
+/**
+ * the trait of profiler messages
+ */
+trait ProfilerMessage
+
+case class SQLStart(
+    sqlText: String,
+    statementId: Long,
+    var startTime: Long = -1,
+    var parseEnd: Long = -1,
+    var analyzerEnd: Long = -1,
+    var endTime: Long = -1,
+    var isCommand: Boolean = false
+) extends ProfilerMessage
+
+case class Optimizer(
+    statementId: Long,
+    startTime: Long,
+    timeTaken: Long
+) extends ProfilerMessage
+
+case class ExecutionStart(
+    executionId: Long,
+    startTime: Long,
+    plan: String
+) extends ProfilerMessage
+
+case class ExecutionEnd(
+    executionId: Long,
+    endTime: Long
+) extends ProfilerMessage
+
+case class GetPartition(
+    executionId: Long,
+    tableName: String,
+    tablePath: String,
+    queryId: String,
+    numOfPartitions: Int,
+    startTime: Long,
+    endTime: Long,
+    getSplitsStart: Long,
+    getSplitsEnd: Long,
+    numSegments: Int,
+    numStreamSegments: Int,
+    numBlocks: Int,
+    distributeStart: Long,
+    distributeEnd: Long,
+    filter: String,
+    projection: String
+) extends ProfilerMessage with Comparable[GetPartition] {
+  override def compareTo(other: GetPartition): Int = {
+    queryId.compareTo(other.queryId)
+  }
+}
+
+case class QueryTaskEnd(
+    executionId: Long,
+    queryId: String,
+    values: Array[Long],
+    size: Long,
+    files: Array[String]
+) extends ProfilerMessage with Comparable[QueryTaskEnd] {
+  override def compareTo(other: QueryTaskEnd): Int = {
+    val result = this.queryId.compareTo(other.queryId)
+    if (result != 0) {
+      result
+    } else {
+      val task = this.values(1) - other.values(1)
+      if (task > 0) {
+        1
+      } else if (task < 0) {
+        -1
+      } else {
+        0
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/ProfilerListener.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/ProfilerListener.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/ProfilerListener.scala
new file mode 100644
index 0000000..24ff33f
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/ProfilerListener.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.profiler
+
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, 
SparkListenerTaskEnd, SparkListenerTaskGettingResult, SparkListenerTaskStart}
+import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, 
SparkListenerSQLExecutionStart}
+
+/**
+ * listen sql execution
+ */
+private[profiler] class ProfilerListener extends SparkListener {
+  override def onOtherEvent(event: SparkListenerEvent): Unit = {
+    Profiler.invokeIfEnable {
+      event match {
+        case executionStart: SparkListenerSQLExecutionStart =>
+          Profiler.addExecutionMessage(
+            executionStart.executionId,
+            ExecutionStart(
+              executionStart.executionId,
+              executionStart.time,
+              executionStart.physicalPlanDescription
+            ))
+        case executionEnd: SparkListenerSQLExecutionEnd =>
+          Profiler.send(
+            ExecutionEnd(
+              executionEnd.executionId,
+              executionEnd.time
+            )
+          )
+        case _ =>
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/ProfilerLogger.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/ProfilerLogger.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/ProfilerLogger.scala
new file mode 100644
index 0000000..d474090
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/profiler/ProfilerLogger.scala
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.profiler
+
+import java.lang.{StringBuilder => JavaStringBuilder}
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util
+import java.util.Comparator
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.stats.TaskStatistics
+import org.apache.carbondata.core.util.CarbonUtil
+
+/**
+ * the util of profiler logger
+ */
+private[profiler] object ProfilerLogger {
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  lazy val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
+
+  // format timestamp value
+  def format(time: Long): String = this.synchronized {
+    if (time < 0) {
+      ""
+    } else {
+      val timestamp = new Timestamp(time)
+      simpleDateFormat.format(timestamp)
+    }
+  }
+
+  def logStatementSummary(statementId: Long, messages: 
ArrayBuffer[ProfilerMessage]): Unit = {
+    LOGGER.info(new StatementSummary(statementId, messages).toString)
+  }
+
+  def logExecutionSummary(executionId: Long, messages: 
ArrayBuffer[ProfilerMessage]): Unit = {
+    LOGGER.info(new ExecutionSummary(executionId, messages).toString)
+  }
+}
+
+/**
+ * summarize the statement information
+ */
+private[profiler] class StatementSummary(
+    statementId: Long,
+    messages: ArrayBuffer[ProfilerMessage]
+) {
+
+  private var sqlText: String = ""
+
+  private var isCommand = false
+
+  private var startTime: Long = -1
+
+  private var parserEnd: Long = -1
+
+  private var analyzerEnd: Long = -1
+
+  private var optimizerStart: Long = -1
+
+  private var optimizerEnd: Long = -1
+
+  private var optimizerTaken: Long = -1
+
+  private var endTime: Long = -1
+
+  // summarize the messages
+  messages.foreach {
+    case sqlStart: SQLStart =>
+      sqlText = sqlStart.sqlText.trim
+      isCommand = sqlStart.isCommand
+      startTime = sqlStart.startTime
+      parserEnd = sqlStart.parseEnd
+      analyzerEnd = sqlStart.analyzerEnd
+      endTime = sqlStart.endTime
+    case optimizer: Optimizer =>
+      optimizerTaken = optimizer.timeTaken
+      optimizerStart = optimizer.startTime
+      optimizerEnd = optimizerStart + optimizerTaken
+  }
+
+  private def totalTaken: Long = endTime - startTime
+
+  private def parserTaken: Long = parserEnd - startTime
+
+  private def analyzerTaken: Long = analyzerEnd - parserEnd
+
+  private def parserToOptimizer: Long = optimizerEnd - startTime
+
+  private def commandTaken: Long = endTime - analyzerEnd
+
+  override def toString: String = {
+    if (isCommand) {
+      buildForComand()
+    } else {
+      buildForQuery()
+    }
+  }
+
+  /**
+   * for example
+   * [statement id]: 1
+   * [sql text]:
+   * +-----------------------------------------------------------+
+   * |CREATE TABLE carbon_table(                                 |
+   * | shortField SHORT,                                         |
+   * | intField INT,                                             |
+   * | bigintField LONG,                                         |
+   * | doubleField DOUBLE,                                       |
+   * | stringField STRING,                                       |
+   * | timestampField TIMESTAMP,                                 |
+   * | decimalField DECIMAL(18,2),                               |
+   * | dateField DATE,                                           |
+   * | charField CHAR(5),                                        |
+   * | floatField FLOAT                                          |
+   * | )                                                         |
+   * | STORED BY 'carbondata'                                    |
+   * | TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField')|
+   * +-----------------------------------------------------------+
+   * [start time]: 2018-03-22 17:12:18.310
+   * [driver side total taken]: 1255 ms
+   *   |__ 1.parser taken: 203 ms
+   *   |__ 2.analyzer taken: 1 ms
+   *   |__ 3.execution taken: 1051 ms
+   *   |__ ...
+   */
+  private def buildForComand(): String = {
+    val builder = new JavaStringBuilder(1000)
+    builder.append(s"\n[statement id]: ${ statementId }")
+    builder.append(s"\n[sql text]:\n")
+    CarbonUtil.logTable(builder, sqlText, "")
+    builder.append(s"\n[start time]: ${ ProfilerLogger.format(startTime) }\n")
+    builder.append(s"[driver side total taken]: $totalTaken ms\n")
+    builder.append(s"  |__ 1.parser taken: $parserTaken ms\n")
+    builder.append(s"  |__ 2.analyzer taken: $analyzerTaken ms\n")
+    builder.append(s"  |__ 3.execution taken: $commandTaken ms")
+    builder.toString
+  }
+
+  /**
+   * for example
+   * [statement id]: 4
+   * [sql text]:
+   * +--------------------------------------------------+
+   * |SELECT charField, stringField, intField           |
+   * | FROM carbon_table                                |
+   * | WHERE stringfield = 'spark' AND decimalField > 40|
+   * +--------------------------------------------------+
+   * [start time]: 2018-03-22 17:12:22.001
+   * [driver side total taken]: 476ms
+   * |__ 1.parser taken: 82 ms
+   * |__ 2.analyzer taken: 195 ms
+   * |__ 3.(149ms)
+   * |__ 4.carbon optimizer taken: 50 ms
+   *       end time: 2018-03-22 17:12:22.477
+   */
+  private def buildForQuery(): String = {
+    val builder = new JavaStringBuilder(1000)
+    builder.append(s"\n[statement id]: ${ statementId }")
+    builder.append(s"\n[sql text]:\n")
+    CarbonUtil.logTable(builder, sqlText, "")
+    builder.append(s"\n[start time]: ${ ProfilerLogger.format(startTime) }\n")
+    builder.append(s"[driver side total taken]: ${ parserToOptimizer }ms\n")
+    builder.append(s"  |__ 1.parser taken: $parserTaken ms\n")
+    builder.append(s"  |__ 2.analyzer taken: $analyzerTaken ms\n")
+    builder.append(s"  |__ 3.(${ optimizerStart - analyzerEnd }ms)\n")
+    builder.append(s"  |__ 4.carbon optimizer taken: $optimizerTaken ms\n")
+    builder.append(s"        end time: ${ ProfilerLogger.format(optimizerEnd) 
}")
+    builder.toString
+  }
+}
+
+/**
+ * summarize the execution information
+ */
+private[profiler] class ExecutionSummary(
+    executionId: Long,
+    messages: ArrayBuffer[ProfilerMessage]
+) {
+
+  private var sqlPlan: String = ""
+
+  private var startTime: Long = -1
+
+  private var endTime: Long = -1
+
+  private val partitions = new util.ArrayList[GetPartition]()
+
+  private val tasks = new util.ArrayList[TaskStatistics]()
+
+  // summarize the messages
+  messages.foreach {
+    case start: ExecutionStart =>
+      sqlPlan = start.plan
+      startTime = start.startTime
+    case end: ExecutionEnd =>
+      endTime = end.endTime
+    case partition: GetPartition =>
+      partitions.add(partition)
+    case task: QueryTaskEnd =>
+      tasks.add(new TaskStatistics(task.queryId, task.values, task.size, 
task.files))
+  }
+
+  private def totalTaken: Long = endTime - startTime
+
+  /**
+   * for example
+   * (prepare inputFormat 55ms)~(getSplits 90ms)~(distributeSplits 4ms)~(3ms)
+   */
+  private def detail(p: GetPartition, builder: JavaStringBuilder): Unit = {
+    builder.append("(prepare inputFormat ").append(p.getSplitsStart - 
p.startTime).append("ms)~")
+    builder.append("(getSplits ").append(p.getSplitsEnd - 
p.getSplitsStart).append("ms)~")
+    val gap1 = p.distributeStart - p.getSplitsEnd
+    if (gap1 > 0) {
+      builder.append("(").append(gap1).append("ms)~")
+    }
+    builder.append("(distributeSplits ").append(p.distributeEnd - 
p.distributeStart).append("ms)")
+    val gap2 = p.endTime - p.distributeEnd
+    if (gap2 > 0) {
+      builder.append("~(").append(gap2).append("ms)")
+    }
+  }
+
+  private def printGetPartitionTable(builder: JavaStringBuilder, indent: 
String): Unit = {
+    util.Collections.sort(partitions)
+    for (rowIndex <- 0 until partitions.size()) {
+      val partition = partitions.get(rowIndex)
+      val content = new JavaStringBuilder()
+      content.append("query_id: ").append(partition.queryId).append("\n")
+        .append("table_name: ").append(partition.tableName).append("\n")
+        .append("table_path: ").append(partition.tablePath).append("\n")
+        .append("start_time: 
").append(ProfilerLogger.format(partition.startTime)).append("\n")
+      // total time
+      content.append("total_time: ").append(partition.endTime - 
partition.startTime).append("ms")
+      content.append(" [")
+      detail(partition, content)
+      content.append("]\n")
+      content.append("valid_segment_num: 
").append(partition.numSegments).append("\n")
+        .append("stream_segment_num: 
").append(partition.numStreamSegments).append("\n")
+        .append("hit_data_block: ").append(partition.numBlocks).append("\n")
+        .append("spark_partition_num: 
").append(partition.numOfPartitions).append("\n")
+        .append("pushed_filter: ").append(partition.filter).append("\n")
+        .append("pushed_projection: ").append(partition.projection)
+      if (rowIndex > 0) {
+        builder.append("\n")
+      }
+      CarbonUtil.logTable(builder, content.toString, indent)
+    }
+  }
+
+  private def printStatisticTable(builder: JavaStringBuilder, indent: String): 
Unit = {
+    util.Collections.sort(tasks, comparator)
+    TaskStatistics.printStatisticTable(tasks, builder, indent)
+  }
+
+  private def printInputFiles(builder: JavaStringBuilder, indent: String): 
Unit = {
+    for (taskIndex <- 0 until tasks.size()) {
+      val task = tasks.get(taskIndex)
+      val content = new JavaStringBuilder()
+      content.append("query_id: ").append(task.getQueryId).append("\n")
+        .append("task_id: ").append(task.getValues()(1)).append("\n")
+        .append("total_size: ").append(task.getFileSize).append("Byte\n")
+        .append("file_list: ")
+      task.getFiles.foreach { file =>
+        content.append("\n  Segment_").append(file)
+      }
+      if (taskIndex > 0) {
+        builder.append("\n")
+      }
+      CarbonUtil.logTable(builder, content.toString, indent)
+    }
+  }
+
+  // use to sort tasks
+  private lazy val comparator = new Comparator[TaskStatistics]() {
+    override def compare(o1: TaskStatistics,
+        o2: TaskStatistics) = {
+      val result = o1.getQueryId.compareTo(o2.getQueryId())
+      if (result != 0) {
+        result
+      } else {
+        val task = o1.getValues()(1) - o2.getValues()(1)
+        if (task > 0) {
+          1
+        } else if (task < 0) {
+          -1
+        } else {
+          0
+        }
+      }
+    }
+  }
+
+  // scalastyle:off
+  /**
+   * for example
+   * [execution id]: 0
+   * [start time]: 2018-03-22 17:12:22.608
+   * [executor side total taken]: 845 ms
+   *   |_1.getPartition
+   *     
+------------------------------------------------------------------------------------------------------------------+
+   *     |query_id: 23737310772188                                             
                                             |
+   *     |table_name: default.carbon_table                                     
                                             |
+   *     |table_path: 
/carbondata/examples/spark2/target/store/default/carbon_table                   
                      |
+   *     |start_time: 2018-03-22 17:12:23.141                                  
                                             |
+   *     |total_time: 152ms [(prepare inputFormat 55ms)~(getSplits 
90ms)~(distributeSplits 4ms)~(3ms)]                      |
+   *     |valid_segment_num: 1                                                 
                                             |
+   *     |stream_segment_num: 1                                                
                                             |
+   *     |hit_data_block: 1                                                    
                                             |
+   *     |spark_partition_num: 1                                               
                                             |
+   *     |pushed_filter: (((stringfield <> null and decimalfield <> null) and 
stringfield = spark) and decimalfield > 40.00)|
+   *     |pushed_projection: charfield,stringfield,intfield                    
                                             |
+   *     
+------------------------------------------------------------------------------------------------------------------+
+   *   |_2.task statistics
+   *     
+--------------+-------+-----------------------+----------+----------------+--------------------+----------------+--------------+---------------+---------------+---------------+-----------+-------------+-----------+-----------+
+   *     |query_id      |task_id|start_time             
|total_time|load_blocks_time|load_dictionary_time|carbon_scan_time|carbon_IO_time|scan_blocks_num|total_blocklets|valid_blocklets|total_pages|scanned_pages|valid_pages|result_size|
+   *     
+--------------+-------+-----------------------+----------+----------------+--------------------+----------------+--------------+---------------+---------------+---------------+-----------+-------------+-----------+-----------+
+   *     |23737310772188|      0|2018-03-22 17:12:23.334|     106ms|           
  3ms|                 0ms|            -1ms|          -1ms|              1|     
         1|              1|          0|            1|          1|          3|
+   *     
+--------------+-------+-----------------------+----------+----------------+--------------------+----------------+--------------+---------------+---------------+---------------+-----------+-------------+-----------+-----------+
+   *   |_3.input files for each task
+   *     +--------------------------------------------------------+
+   *     |query_id: 23737310772188                                |
+   *     |task_id: 0                                              |
+   *     |total_size: 2717Byte                                    |
+   *     |file_list:                                              |
+   *     |  Segment_0/part-0-0_batchno0-0-1521709939969.carbondata|
+   *     +--------------------------------------------------------+
+   * [sql plan]:
+   * +----------+
+   * | ...      |
+   * +----------+
+   */
+  // scalastyle:on
+  override def toString: String = {
+    val builder = new JavaStringBuilder(1000)
+    builder.append(s"\n[execution id]: ${ executionId }\n")
+    builder.append(s"[start time]: ${ ProfilerLogger.format(startTime) }\n")
+    builder.append(s"[executor side total taken]: $totalTaken ms")
+    builder.append(s"\n  |_1.getPartition\n")
+    printGetPartitionTable(builder, "    ")
+    builder.append(s"\n  |_2.task statistics\n")
+    printStatisticTable(builder, "    ")
+    builder.append(s"\n  |_3.input files for each task\n")
+    printInputFiles(builder, "    ")
+    builder.append(s"\n[sql plan]:\n")
+    CarbonUtil.logTable(builder, sqlPlan, "")
+    builder.toString
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index bf958f8..f6bdff6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql
 
 import java.io.File
+import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.JavaConverters._
 
@@ -24,9 +25,12 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.SparkSession.Builder
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LocalRelation, 
Union}
 import org.apache.spark.sql.execution.streaming.CarbonStreamingQueryListener
 import org.apache.spark.sql.hive.execution.command.CarbonSetCommand
 import org.apache.spark.sql.internal.{SessionState, SharedState}
+import org.apache.spark.sql.profiler.{Profiler, SQLStart}
 import org.apache.spark.util.{CarbonReflectionUtils, Utils}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -39,7 +43,8 @@ import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
  * User needs to use {CarbonSession.getOrCreateCarbon} to create Carbon 
session.
  */
 class CarbonSession(@transient val sc: SparkContext,
-    @transient private val existingSharedState: Option[SharedState]) extends 
SparkSession(sc) {
+    @transient private val existingSharedState: Option[SharedState]
+) extends SparkSession(sc) { self =>
 
   def this(sc: SparkContext) {
     this(sc, None)
@@ -72,10 +77,47 @@ class CarbonSession(@transient val sc: SparkContext,
     new CarbonSession(sparkContext, Some(sharedState))
   }
 
+  override def sql(sqlText: String): DataFrame = {
+    val sse = SQLStart(sqlText, CarbonSession.statementId.getAndIncrement())
+    CarbonSession.threadStatementId.set(sse.statementId)
+    sse.startTime = System.currentTimeMillis()
+
+    try {
+      val logicalPlan = sessionState.sqlParser.parsePlan(sqlText)
+      sse.parseEnd = System.currentTimeMillis()
+
+      val qe = sessionState.executePlan(logicalPlan)
+      qe.assertAnalyzed()
+      sse.isCommand = qe.analyzed match {
+        case c: Command =>
+          true
+        case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>
+          true
+        case _ =>
+          false
+      }
+      sse.analyzerEnd = System.currentTimeMillis()
+
+      new Dataset[Row](self, qe, RowEncoder(qe.analyzed.schema))
+    } finally {
+      Profiler.invokeIfEnable {
+        if (sse.isCommand) {
+          sse.endTime = System.currentTimeMillis()
+          Profiler.send(sse)
+        } else {
+          Profiler.addStatementMessage(sse.statementId, sse)
+        }
+      }
+    }
+  }
 }
 
 object CarbonSession {
 
+  private val statementId = new AtomicLong(0)
+
+  private[sql] val threadStatementId = new ThreadLocal[Long]()
+
   implicit class CarbonBuilder(builder: Builder) {
 
     def getOrCreateCarbonSession(): SparkSession = {
@@ -174,6 +216,8 @@ object CarbonSession {
         }
         options.foreach { case (k, v) => 
session.sessionState.conf.setConfString(k, v) }
         SparkSession.setDefaultSession(session)
+        // Setup monitor end point and register CarbonMonitorListener
+        Profiler.initialize(sparkContext)
         // Register a successfully instantiated context to the singleton. This 
should be at the
         // end of the class definition so that the singleton is updated only 
if there is no
         // exception in the construction of the instance.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
index b7df9b4..aa01c38 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
@@ -73,6 +73,10 @@ class SparkUnknownExpression(
     sparkExp.toString()
   }
 
+  override def getStatement: String = {
+    sparkExp.toString()
+  }
+
   def setEvaluateExpression(evaluateExpression: (InternalRow) => Any): Unit = {
     this.evaluateExpression = evaluateExpression
     isExecutor = true

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 0aa7514..e99f502 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -28,8 +28,8 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.command.RunnableCommand
-import 
org.apache.spark.sql.execution.command.mutation.CarbonProjectForUpdateCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.profiler.{Optimizer, Profiler}
 import org.apache.spark.sql.types.{IntegerType, StringType}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -58,6 +58,15 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with 
PredicateHelper {
         System.currentTimeMillis)
       recorder.recordStatistics(queryStatistic)
       recorder.logStatistics()
+      Profiler.invokeIfEnable {
+        Profiler.send(
+          Optimizer(
+            CarbonSession.threadStatementId.get(),
+            queryStatistic.getStartTime,
+            queryStatistic.getTimeTaken
+          )
+        )
+      }
       result
     } else {
       LOGGER.info("Skip CarbonOptimizer")

Reply via email to