[CARBONDATA-2271] Collect SQL execution information to driver side

This PR add support for collecting SQL execution information for profiling 
purpose. See CarbonSessionExample.scala, it will generate a separated log file 
containing profiling information

This closes #2087


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d5bec4dd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d5bec4dd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d5bec4dd

Branch: refs/heads/master
Commit: d5bec4dd7eb9b40fe3f6618093ba28e886e56b25
Parents: e58ca9f
Author: QiangCai <[email protected]>
Authored: Mon Mar 19 19:49:47 2018 +0800
Committer: Jacky Li <[email protected]>
Committed: Sat Mar 24 15:50:03 2018 +0800

----------------------------------------------------------------------
 .../core/scan/expression/BinaryExpression.java  |   3 +-
 .../core/scan/expression/ColumnExpression.java  |  17 +-
 .../core/scan/expression/Expression.java        |   1 +
 .../core/scan/expression/LiteralExpression.java |  17 +-
 .../core/scan/expression/MatchExpression.java   |   5 +
 .../conditional/EqualToExpression.java          |  17 +-
 .../GreaterThanEqualToExpression.java           |  11 +-
 .../conditional/GreaterThanExpression.java      |  13 +-
 .../expression/conditional/InExpression.java    |  13 +-
 .../conditional/LessThanEqualToExpression.java  |  10 +-
 .../conditional/LessThanExpression.java         |  10 +-
 .../expression/conditional/ListExpression.java  |  24 +-
 .../conditional/NotEqualsExpression.java        |  14 +-
 .../expression/conditional/NotInExpression.java |  13 +-
 .../conditional/StartsWithExpression.java       |  13 +-
 .../scan/expression/logical/AndExpression.java  |  13 +-
 .../expression/logical/FalseExpression.java     |  14 +-
 .../scan/expression/logical/OrExpression.java   |  13 +-
 .../expression/logical/RangeExpression.java     |  17 +-
 .../scan/expression/logical/TrueExpression.java |  14 +-
 .../carbondata/core/scan/model/QueryModel.java  |   4 +
 .../DriverQueryStatisticsRecorderDummy.java     |   8 +-
 .../DriverQueryStatisticsRecorderImpl.java      |   6 +-
 .../carbondata/core/stats/QueryStatistic.java   |   4 +
 .../core/stats/QueryStatisticsRecorder.java     |   4 +-
 .../stats/QueryStatisticsRecorderDummy.java     |   9 +-
 .../core/stats/QueryStatisticsRecorderImpl.java | 119 +-----
 .../carbondata/core/stats/TaskStatistics.java   | 164 +++++++++
 .../carbondata/core/util/CarbonProperties.java  |  22 ++
 .../apache/carbondata/core/util/CarbonUtil.java | 101 +++++
 .../stats/QueryStasticsRecorderImplTest.java    |   3 +-
 .../spark2/src/main/resources/log4j.properties  |   7 +
 .../examples/CarbonSessionExample.scala         |  25 +-
 .../hadoop/CarbonMultiBlockSplit.java           |   2 +-
 .../hadoop/api/CarbonFileInputFormat.java       |   2 +
 .../hadoop/api/CarbonInputFormat.java           |  17 +
 .../hadoop/api/CarbonTableInputFormat.java      |   4 +-
 .../spark/sql/profiler/ProfilerSuite.scala      | 149 ++++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 192 +++++++---
 .../apache/spark/sql/profiler/Profiler.scala    | 250 +++++++++++++
 .../spark/sql/profiler/ProfilerListener.scala   |  49 +++
 .../spark/sql/profiler/ProfilerLogger.scala     | 368 +++++++++++++++++++
 .../org/apache/spark/sql/CarbonSession.scala    |  46 ++-
 .../spark/sql/SparkUnknownExpression.scala      |   4 +
 .../sql/optimizer/CarbonLateDecodeRule.scala    |  11 +-
 45 files changed, 1581 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/BinaryExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/BinaryExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/BinaryExpression.java
index 0e73410..0a86b5c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/BinaryExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/BinaryExpression.java
@@ -38,7 +38,8 @@ public abstract class BinaryExpression extends Expression {
     return right;
   }
 
-  @Override public void findAndSetChild(Expression oldExpr, Expression 
newExpr) {
+  @Override
+  public void findAndSetChild(Expression oldExpr, Expression newExpr) {
     for (int i = 0; i < children.size(); i++) {
       if (oldExpr.equals(children.get(i))) {
         if (this.left.equals(children.get(i))) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java
index 5cd2d34..39ad312 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java
@@ -98,20 +98,28 @@ public class ColumnExpression extends LeafExpression {
     return dataType;
   }
 
-  @Override public ExpressionResult evaluate(RowIntf value) {
+  @Override
+  public ExpressionResult evaluate(RowIntf value) {
     return new ExpressionResult(dataType, (null == value ? null : 
value.getVal(colIndex)));
   }
 
-  @Override public ExpressionType getFilterExpressionType() {
+  @Override
+  public ExpressionType getFilterExpressionType() {
     // TODO Auto-generated method stub
     return null;
   }
 
-  @Override public String getString() {
+  @Override
+  public String getString() {
     // TODO Auto-generated method stub
     return "ColumnExpression(" + columnName + ')';
   }
 
+  @Override
+  public String getStatement() {
+    return columnName;
+  }
+
   public CarbonColumn getCarbonColumn() {
     return carbonColumn;
   }
@@ -120,7 +128,8 @@ public class ColumnExpression extends LeafExpression {
     this.carbonColumn = carbonColumn;
   }
 
-  @Override public void findAndSetChild(Expression oldExpr, Expression 
newExpr) {
+  @Override
+  public void findAndSetChild(Expression oldExpr, Expression newExpr) {
   }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/Expression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/Expression.java 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/Expression.java
index da0df05..13acc63 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/Expression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/Expression.java
@@ -51,4 +51,5 @@ public abstract class Expression implements Serializable {
 
   public abstract String getString();
 
+  public abstract String getStatement();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/LiteralExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/LiteralExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/LiteralExpression.java
index b194e3c..14accc1 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/LiteralExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/LiteralExpression.java
@@ -36,7 +36,8 @@ public class LiteralExpression extends LeafExpression {
     this.dataType = dataType;
   }
 
-  @Override public ExpressionResult evaluate(RowIntf value) {
+  @Override
+  public ExpressionResult evaluate(RowIntf value) {
     return new ExpressionResult(dataType, this.value, true);
   }
 
@@ -44,16 +45,23 @@ public class LiteralExpression extends LeafExpression {
     return new ExpressionResult(dataType, this.value, true);
   }
 
-  @Override public ExpressionType getFilterExpressionType() {
+  @Override
+  public ExpressionType getFilterExpressionType() {
     // TODO Auto-generated method stub
     return ExpressionType.LITERAL;
   }
 
-  @Override public String getString() {
+  @Override
+  public String getString() {
     // TODO Auto-generated method stub
     return "LiteralExpression(" + value + ')';
   }
 
+  @Override
+  public String getStatement() {
+    return value == null ? null : value.toString();
+  }
+
   /**
    * getLiteralExpDataType.
    *
@@ -67,7 +75,8 @@ public class LiteralExpression extends LeafExpression {
     return value;
   }
 
-  @Override public void findAndSetChild(Expression oldExpr, Expression 
newExpr) {
+  @Override
+  public void findAndSetChild(Expression oldExpr, Expression newExpr) {
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/MatchExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/MatchExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/MatchExpression.java
index 3677b51..3ab2885 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/MatchExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/MatchExpression.java
@@ -54,4 +54,9 @@ public class MatchExpression extends Expression {
   public String getString() {
     return queryString;
   }
+
+  @Override
+  public String getStatement() {
+    return queryString;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpression.java
index faf5bb1..0588f2c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpression.java
@@ -40,7 +40,8 @@ public class EqualToExpression extends 
BinaryConditionalExpression {
     this.isNull = isNull;
   }
 
-  @Override public ExpressionResult evaluate(RowIntf value)
+  @Override
+  public ExpressionResult evaluate(RowIntf value)
       throws FilterUnsupportedException, FilterIllegalMemberException {
     ExpressionResult elRes = left.evaluate(value);
     ExpressionResult erRes = right.evaluate(value);
@@ -91,12 +92,22 @@ public class EqualToExpression extends 
BinaryConditionalExpression {
     return val1;
   }
 
-  @Override public ExpressionType getFilterExpressionType() {
+  @Override
+  public ExpressionType getFilterExpressionType() {
     return ExpressionType.EQUALS;
   }
 
-  @Override public String getString() {
+  @Override
+  public String getString() {
     return "EqualTo(" + left.getString() + ',' + right.getString() + ')';
   }
 
+  @Override
+  public String getStatement() {
+    if (isNull) {
+      return  left.getStatement() + " is " + right.getStatement();
+    } else {
+      return  left.getStatement() + " = " + right.getStatement();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpression.java
index 24575d2..c15ae62 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpression.java
@@ -74,11 +74,18 @@ public class GreaterThanEqualToExpression extends 
BinaryConditionalExpression {
     return exprResVal1;
   }
 
-  @Override public ExpressionType getFilterExpressionType() {
+  @Override
+  public ExpressionType getFilterExpressionType() {
     return ExpressionType.GREATERTHAN_EQUALTO;
   }
 
-  @Override public String getString() {
+  @Override
+  public String getString() {
     return "GreaterThanEqualTo(" + left.getString() + ',' + right.getString() 
+ ')';
   }
+
+  @Override
+  public String getStatement() {
+    return left.getStatement() + " >= " + right.getStatement();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpression.java
index ddc3d30..aecd782 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpression.java
@@ -34,7 +34,8 @@ public class GreaterThanExpression extends 
BinaryConditionalExpression {
     super(left, right);
   }
 
-  @Override public ExpressionResult evaluate(RowIntf value)
+  @Override
+  public ExpressionResult evaluate(RowIntf value)
       throws FilterUnsupportedException, FilterIllegalMemberException {
     ExpressionResult exprLeftRes = left.evaluate(value);
     ExpressionResult exprRightRes = right.evaluate(value);
@@ -76,12 +77,18 @@ public class GreaterThanExpression extends 
BinaryConditionalExpression {
     return val1;
   }
 
-  @Override public ExpressionType getFilterExpressionType() {
+  @Override
+  public ExpressionType getFilterExpressionType() {
     return ExpressionType.GREATERTHAN;
   }
 
-  @Override public String getString() {
+  @Override
+  public String getString() {
     return "GreaterThan(" + left.getString() + ',' + right.getString() + ')';
   }
 
+  @Override
+  public String getStatement() {
+    return left.getStatement() + " > " + right.getStatement();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/InExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/InExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/InExpression.java
index a560cc3..11d538e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/InExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/InExpression.java
@@ -38,7 +38,8 @@ public class InExpression extends BinaryConditionalExpression 
{
     super(left, right);
   }
 
-  @Override public ExpressionResult evaluate(RowIntf value)
+  @Override
+  public ExpressionResult evaluate(RowIntf value)
       throws FilterUnsupportedException, FilterIllegalMemberException {
     ExpressionResult leftRsult = left.evaluate(value);
 
@@ -93,12 +94,18 @@ public class InExpression extends 
BinaryConditionalExpression {
     return leftRsult;
   }
 
-  @Override public ExpressionType getFilterExpressionType() {
+  @Override
+  public ExpressionType getFilterExpressionType() {
     return ExpressionType.IN;
   }
 
-  @Override public String getString() {
+  @Override
+  public String getString() {
     return "IN(" + left.getString() + ',' + right.getString() + ')';
   }
 
+  @Override
+  public String getStatement() {
+    return left.getStatement() + " in " + right.getStatement();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpression.java
index df7d791..6fee008 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpression.java
@@ -74,13 +74,19 @@ public class LessThanEqualToExpression extends 
BinaryConditionalExpression {
     return exprResValue1;
   }
 
-  @Override public ExpressionType getFilterExpressionType() {
+  @Override
+  public ExpressionType getFilterExpressionType() {
     // TODO Auto-generated method stub
     return ExpressionType.LESSTHAN_EQUALTO;
   }
 
-  @Override public String getString() {
+  @Override
+  public String getString() {
     return "LessThanEqualTo(" + left.getString() + ',' + right.getString() + 
')';
   }
 
+  @Override
+  public String getStatement() {
+    return left.getStatement() + " <= " + right.getStatement();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpression.java
index f4b7f7c..4e4c240 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpression.java
@@ -78,12 +78,18 @@ public class LessThanExpression extends 
BinaryConditionalExpression {
     return val1;
   }
 
-  @Override public ExpressionType getFilterExpressionType() {
+  @Override
+  public ExpressionType getFilterExpressionType() {
     return ExpressionType.LESSTHAN;
   }
 
-  @Override public String getString() {
+  @Override
+  public String getString() {
     return "LessThan(" + left.getString() + ',' + right.getString() + ')';
   }
 
+  @Override
+  public String getStatement() {
+    return left.getStatement() + " < " + right.getStatement();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ListExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ListExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ListExpression.java
index 1c00b14..32b5028 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ListExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ListExpression.java
@@ -34,7 +34,8 @@ public class ListExpression extends Expression {
     this.children = children;
   }
 
-  @Override public ExpressionResult evaluate(RowIntf value) throws 
FilterUnsupportedException {
+  @Override
+  public ExpressionResult evaluate(RowIntf value) throws 
FilterUnsupportedException {
     List<ExpressionResult> listOfExprRes = new ArrayList<ExpressionResult>(10);
 
     for (Expression expr : children) {
@@ -47,11 +48,13 @@ public class ListExpression extends Expression {
     return new ExpressionResult(listOfExprRes);
   }
 
-  @Override public ExpressionType getFilterExpressionType() {
+  @Override
+  public ExpressionType getFilterExpressionType() {
     return ExpressionType.LIST;
   }
 
-  @Override public String getString() {
+  @Override
+  public String getString() {
     StringBuffer value = new StringBuffer();
     value.append("ListExpression(");
     for (Expression expr : children) {
@@ -62,6 +65,19 @@ public class ListExpression extends Expression {
     return  value.toString();
   }
 
-  @Override public void findAndSetChild(Expression oldExpr, Expression 
newExpr) {
+  @Override
+  public String getStatement() {
+    StringBuffer value = new StringBuffer();
+    value.append("(");
+    for (Expression expr : children) {
+      value.append(expr.getString()).append(";");
+    }
+    value.append(')');
+
+    return value.toString();
+  }
+
+  @Override
+  public void findAndSetChild(Expression oldExpr, Expression newExpr) {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
index 5046722..eae8019 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
@@ -39,7 +39,8 @@ public class NotEqualsExpression extends 
BinaryConditionalExpression {
     super(left, right);
   }
 
-  @Override public ExpressionResult evaluate(RowIntf value)
+  @Override
+  public ExpressionResult evaluate(RowIntf value)
       throws FilterUnsupportedException, FilterIllegalMemberException {
     ExpressionResult elRes = left.evaluate(value);
     ExpressionResult erRes = right.evaluate(value);
@@ -87,11 +88,18 @@ public class NotEqualsExpression extends 
BinaryConditionalExpression {
     return val1;
   }
 
-  @Override public ExpressionType getFilterExpressionType() {
+  @Override
+  public ExpressionType getFilterExpressionType() {
     return ExpressionType.NOT_EQUALS;
   }
 
-  @Override public String getString() {
+  @Override
+  public String getString() {
     return "NotEquals(" + left.getString() + ',' + right.getString() + ')';
   }
+
+  @Override
+  public String getStatement() {
+    return left.getStatement() + " <> " + right.getStatement();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpression.java
index c4a2fc8..cbc2995 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpression.java
@@ -38,7 +38,8 @@ public class NotInExpression extends 
BinaryConditionalExpression {
     super(left, right);
   }
 
-  @Override public ExpressionResult evaluate(RowIntf value)
+  @Override
+  public ExpressionResult evaluate(RowIntf value)
       throws FilterUnsupportedException, FilterIllegalMemberException {
 
     // Both left and right result need to be checked for null because 
NotInExpression is basically
@@ -104,12 +105,18 @@ public class NotInExpression extends 
BinaryConditionalExpression {
     return leftRsult;
   }
 
-  @Override public ExpressionType getFilterExpressionType() {
+  @Override
+  public ExpressionType getFilterExpressionType() {
     return ExpressionType.NOT_IN;
   }
 
-  @Override public String getString() {
+  @Override
+  public String getString() {
     return "NOT IN(" + left.getString() + ',' + right.getString() + ')';
   }
 
+  @Override
+  public String getStatement() {
+    return left.getStatement() + " not in " + right.getStatement();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/StartsWithExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/StartsWithExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/StartsWithExpression.java
index 18c7374..cabcaf2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/StartsWithExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/StartsWithExpression.java
@@ -33,7 +33,8 @@ public class StartsWithExpression extends 
BinaryConditionalExpression {
     super(left, right);
   }
 
-  @Override public ExpressionResult evaluate(RowIntf value)
+  @Override
+  public ExpressionResult evaluate(RowIntf value)
       throws FilterUnsupportedException, FilterIllegalMemberException {
     ExpressionResult exprLeftRes = left.evaluate(value);
     ExpressionResult exprRightRes = right.evaluate(value);
@@ -61,12 +62,18 @@ public class StartsWithExpression extends 
BinaryConditionalExpression {
     return val1;
   }
 
-  @Override public ExpressionType getFilterExpressionType() {
+  @Override
+  public ExpressionType getFilterExpressionType() {
     return ExpressionType.STARTSWITH;
   }
 
-  @Override public String getString() {
+  @Override
+  public String getString() {
     return "StartsWith(" + left.getString() + ',' + right.getString() + ')';
   }
 
+  @Override
+  public String getStatement() {
+    return left.getStatement() + " like '" + right.getStatement() + "%'";
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/AndExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/AndExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/AndExpression.java
index 4a3508b..aa01998 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/AndExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/AndExpression.java
@@ -33,7 +33,8 @@ public class AndExpression extends BinaryLogicalExpression {
     super(left, right);
   }
 
-  @Override public ExpressionResult evaluate(RowIntf value)
+  @Override
+  public ExpressionResult evaluate(RowIntf value)
       throws FilterUnsupportedException, FilterIllegalMemberException {
     ExpressionResult resultLeft = left.evaluate(value);
     ExpressionResult resultRight = right.evaluate(value);
@@ -46,14 +47,20 @@ public class AndExpression extends BinaryLogicalExpression {
     return resultLeft;
   }
 
-  @Override public ExpressionType getFilterExpressionType() {
+  @Override
+  public ExpressionType getFilterExpressionType() {
     // TODO Auto-generated method stub
     return ExpressionType.AND;
   }
 
-  @Override public String getString() {
+  @Override
+  public String getString() {
     // TODO Auto-generated method stub
     return "And(" + left.getString() + ',' + right.getString() + ')';
   }
 
+  @Override
+  public String getStatement() {
+    return "(" + left.getStatement() + " and " + right.getStatement() + ")";
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/FalseExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/FalseExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/FalseExpression.java
index f62980b..5e8010c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/FalseExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/FalseExpression.java
@@ -50,7 +50,8 @@ public class FalseExpression  extends 
BinaryConditionalExpression {
    * @throws FilterUnsupportedException
    * @throws FilterIllegalMemberException
    */
-  @Override public ExpressionResult evaluate(RowIntf value)
+  @Override
+  public ExpressionResult evaluate(RowIntf value)
       throws FilterUnsupportedException, FilterIllegalMemberException {
     return new ExpressionResult(DataTypes.BOOLEAN,false);
   }
@@ -59,10 +60,17 @@ public class FalseExpression  extends 
BinaryConditionalExpression {
    * This method will return the expression types
    * @return
    */
-  @Override public ExpressionType getFilterExpressionType() {
+  @Override
+  public ExpressionType getFilterExpressionType() {
     return ExpressionType.FALSE;
   }
-  @Override public String getString() {
+  @Override
+  public String getString() {
     return null;
   }
+
+  @Override
+  public String getStatement() {
+    return "false";
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/OrExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/OrExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/OrExpression.java
index bd10f10..148081a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/OrExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/OrExpression.java
@@ -33,7 +33,8 @@ public class OrExpression extends BinaryLogicalExpression {
     super(left, right);
   }
 
-  @Override public ExpressionResult evaluate(RowIntf value)
+  @Override
+  public ExpressionResult evaluate(RowIntf value)
       throws FilterIllegalMemberException, FilterUnsupportedException {
     ExpressionResult resultLeft = left.evaluate(value);
     ExpressionResult resultRight = right.evaluate(value);
@@ -46,12 +47,18 @@ public class OrExpression extends BinaryLogicalExpression {
     return resultLeft;
   }
 
-  @Override public ExpressionType getFilterExpressionType() {
+  @Override
+  public ExpressionType getFilterExpressionType() {
     return ExpressionType.OR;
   }
 
-  @Override public String getString() {
+  @Override
+  public String getString() {
     return "Or(" + left.getString() + ',' + right.getString() + ')';
   }
 
+  @Override
+  public String getStatement() {
+    return "(" + left.getString() + " or " + right.getString() + ")";
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/RangeExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/RangeExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/RangeExpression.java
index 01e3270..5d09fb1 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/RangeExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/RangeExpression.java
@@ -40,7 +40,8 @@ public class RangeExpression extends 
BinaryConditionalExpression {
     super(left, right);
   }
 
-  @Override public ExpressionResult evaluate(RowIntf value)
+  @Override
+  public ExpressionResult evaluate(RowIntf value)
       throws FilterUnsupportedException, FilterIllegalMemberException {
     ExpressionResult resultLeft = left.evaluate(value);
     ExpressionResult resultRight = right.evaluate(value);
@@ -53,15 +54,23 @@ public class RangeExpression extends 
BinaryConditionalExpression {
     return resultLeft;
   }
 
-  @Override public ExpressionType getFilterExpressionType() {
+  @Override
+  public ExpressionType getFilterExpressionType() {
     return ExpressionType.RANGE;
   }
 
-  @Override public String getString() {
+  @Override
+  public String getString() {
     return null;
   }
 
-  @Override public List<ExpressionResult> getLiterals() {
+  @Override
+  public String getStatement() {
+    return left.getStatement() + " between " + right.getStatement();
+  }
+
+  @Override
+  public List<ExpressionResult> getLiterals() {
     List<ExpressionResult> listOfExp =
         new 
ArrayList<ExpressionResult>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     getLiteralsResult(this, listOfExp);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/TrueExpression.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/TrueExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/TrueExpression.java
index 39db1d2..3948589 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/TrueExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/TrueExpression.java
@@ -48,7 +48,8 @@ public class TrueExpression extends 
BinaryConditionalExpression {
    * @throws FilterUnsupportedException
    * @throws FilterIllegalMemberException
    */
-  @Override public ExpressionResult evaluate(RowIntf value)
+  @Override
+  public ExpressionResult evaluate(RowIntf value)
       throws FilterUnsupportedException, FilterIllegalMemberException {
     return new ExpressionResult(DataTypes.BOOLEAN,true);
   }
@@ -57,10 +58,17 @@ public class TrueExpression extends 
BinaryConditionalExpression {
    * This method will return the expression types
    * @return
    */
-  @Override public ExpressionType getFilterExpressionType() {
+  @Override
+  public ExpressionType getFilterExpressionType() {
     return ExpressionType.TRUE;
   }
-  @Override public String getString() {
+  @Override
+  public String getString() {
     return null;
   }
+
+  @Override
+  public String getStatement() {
+    return "true";
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java 
b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 4e3665f..47a6bdd 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -227,6 +227,10 @@ public class QueryModel {
     return queryId;
   }
 
+  public void setQueryId(String queryId) {
+    this.queryId = queryId;
+  }
+
   /**
    * @return the tableBlockInfos
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderDummy.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderDummy.java
 
b/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderDummy.java
index 09b0862..130bc43 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderDummy.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderDummy.java
@@ -40,14 +40,16 @@ public class DriverQueryStatisticsRecorderDummy implements 
QueryStatisticsRecord
 
   }
 
-  public void logStatisticsAsTableExecutor() {
+  public TaskStatistics statisticsForTask(long taskId, long startTime) {
+    return null;
+  }
+
+  public void logStatisticsForTask(TaskStatistics task) {
 
   }
 
   /**
    * Below method will be used to add the statistics
-   *
-   * @param statistic
    */
   public synchronized void recordStatisticsForDriver(QueryStatistic statistic, 
String queryId) {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderImpl.java
index b38632f..20536fb 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderImpl.java
@@ -67,7 +67,11 @@ public class DriverQueryStatisticsRecorderImpl implements 
QueryStatisticsRecorde
 
   }
 
-  public void logStatisticsAsTableExecutor() {
+  public TaskStatistics statisticsForTask(long taskId, long startTime) {
+    return null;
+  }
+
+  public void logStatisticsForTask(TaskStatistics task) {
 
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/stats/QueryStatistic.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatistic.java 
b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatistic.java
index b97b137..5b0bd6a 100644
--- a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatistic.java
+++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatistic.java
@@ -109,4 +109,8 @@ public class QueryStatistic implements Serializable {
     return this.count;
   }
 
+  public long getStartTime() {
+    return this.startTime;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorder.java
 
b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorder.java
index a52f70b..69bd485 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorder.java
@@ -25,7 +25,9 @@ public interface QueryStatisticsRecorder {
 
   void logStatistics();
 
-  void logStatisticsAsTableExecutor();
+  TaskStatistics statisticsForTask(long taskId, long startTime);
+
+  void logStatisticsForTask(TaskStatistics task);
 
   void recordStatisticsForDriver(QueryStatistic statistic, String queryId);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderDummy.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderDummy.java
 
b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderDummy.java
index 57a5f5b..e312f53 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderDummy.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderDummy.java
@@ -48,10 +48,11 @@ public class QueryStatisticsRecorderDummy implements 
QueryStatisticsRecorder,Ser
 
   }
 
-  /**
-   * Below method will be used to show statistic log as table
-   */
-  public void logStatisticsAsTableExecutor() {
+  public TaskStatistics statisticsForTask(long taskId, long startTime) {
+    return null;
+  }
+
+  public void logStatisticsForTask(TaskStatistics task) {
 
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
index 9807eaa..78ebd09 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
@@ -23,8 +23,6 @@ import java.util.List;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 
-import static org.apache.carbondata.core.util.CarbonUtil.printLine;
-
 /**
  * Class will be used to record and log the query statistics
  */
@@ -46,13 +44,13 @@ public class QueryStatisticsRecorderImpl implements 
QueryStatisticsRecorder, Ser
   private List<QueryStatistic> queryStatistics;
 
   /**
-   * query with taskd
+   * query id with task
    */
-  private String queryIWthTask;
+  private String queryId;
 
   public QueryStatisticsRecorderImpl(String queryId) {
     queryStatistics = new ArrayList<QueryStatistic>();
-    this.queryIWthTask = queryId;
+    this.queryId = queryId;
   }
 
   /**
@@ -69,122 +67,25 @@ public class QueryStatisticsRecorderImpl implements 
QueryStatisticsRecorder, Ser
    */
   public void logStatistics() {
     for (QueryStatistic statistic : queryStatistics) {
-      LOGGER.statistic(statistic.getStatistics(queryIWthTask));
+      LOGGER.statistic(statistic.getStatistics(queryId));
     }
   }
 
   /**
    * Below method will be used to show statistic log as table
    */
-  public void logStatisticsAsTableExecutor() {
-    String tableInfo = collectExecutorStatistics();
-    if (null != tableInfo) {
-      LOGGER.statistic(tableInfo);
+  public void logStatisticsForTask(TaskStatistics result) {
+    if (null != result) {
+      LOGGER.statistic("Print query statistic for each task id:" + "\n" + 
result.toString());
     }
   }
 
-  /**
-   * Below method will parse queryStatisticsMap and put time into table
-   */
-  public String collectExecutorStatistics() {
-    long load_blocks_time = 0;
-    long scan_blocks_time = 0;
-    long scan_blocks_num = 0;
-    long load_dictionary_time = 0;
-    long result_size = 0;
-    long total_executor_time = 0;
-    long total_blocklet = 0;
-    long valid_scan_blocklet = 0;
-    long valid_pages_blocklet = 0;
-    long total_pages = 0;
-    long readTime = 0;
-    long scannedPages = 0;
+  public TaskStatistics statisticsForTask(long taskId, long startTime) {
     try {
-      for (QueryStatistic statistic : queryStatistics) {
-        if (statistic.getMessage() != null) {
-          switch (statistic.getMessage()) {
-            case QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR:
-              load_blocks_time += statistic.getTimeTaken();
-              break;
-            case QueryStatisticsConstants.SCAN_BLOCKlET_TIME:
-              scan_blocks_time += statistic.getCount();
-              break;
-            case QueryStatisticsConstants.SCAN_BLOCKS_NUM:
-              scan_blocks_num += statistic.getCount();
-              break;
-            case QueryStatisticsConstants.LOAD_DICTIONARY:
-              load_dictionary_time += statistic.getTimeTaken();
-              break;
-            case QueryStatisticsConstants.RESULT_SIZE:
-              result_size += statistic.getCount();
-              break;
-            case QueryStatisticsConstants.EXECUTOR_PART:
-              total_executor_time += statistic.getTimeTaken();
-              break;
-            case QueryStatisticsConstants.TOTAL_BLOCKLET_NUM:
-              total_blocklet = statistic.getCount();
-              break;
-            case QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM:
-              valid_scan_blocklet = statistic.getCount();
-              break;
-            case QueryStatisticsConstants.VALID_PAGE_SCANNED:
-              valid_pages_blocklet = statistic.getCount();
-              break;
-            case QueryStatisticsConstants.TOTAL_PAGE_SCANNED:
-              total_pages = statistic.getCount();
-              break;
-            case QueryStatisticsConstants.READ_BLOCKlET_TIME:
-              readTime = statistic.getCount();
-              break;
-            case QueryStatisticsConstants.PAGE_SCANNED:
-              scannedPages = statistic.getCount();
-              break;
-            default:
-              break;
-          }
-        }
-      }
-      String headers =
-          
"task_id,load_blocks_time,load_dictionary_time,carbon_scan_time,carbon_IO_time, 
"
-              + "total_executor_time,scan_blocks_num,total_blocklets,"
-              + 
"valid_blocklets,total_pages,scanned_pages,valid_pages,result_size";
-      List<String> values = new ArrayList<String>();
-      values.add(queryIWthTask);
-      values.add(load_blocks_time + "ms");
-      values.add(load_dictionary_time + "ms");
-      values.add(scan_blocks_time + "ms");
-      values.add(readTime + "ms");
-      values.add(total_executor_time + "ms");
-      values.add(String.valueOf(scan_blocks_num));
-      values.add(String.valueOf(total_blocklet));
-      values.add(String.valueOf(valid_scan_blocklet));
-      values.add(String.valueOf(total_pages));
-      values.add(String.valueOf(scannedPages));
-      values.add(String.valueOf(valid_pages_blocklet));
-      values.add(String.valueOf(result_size));
-      StringBuilder tableInfo = new StringBuilder();
-      String[] columns = headers.split(",");
-      StringBuilder line = new StringBuilder("");
-      StringBuilder hearLine = new StringBuilder("");
-      StringBuilder valueLine = new StringBuilder("");
-      for (int i = 0; i < columns.length; i++) {
-        int len = Math.max(columns[i].length(), values.get(i).length());
-        line.append("+").append(printLine("-", len));
-        hearLine.append("|").append(printLine(" ", len - columns[i].length()))
-            .append(columns[i]);
-        valueLine.append("|").append(printLine(" ", len - 
values.get(i).length()))
-            .append(values.get(i));
-      }
-      // struct table info
-      tableInfo.append(line).append("+").append("\n");
-      tableInfo.append(hearLine).append("|").append("\n");
-      tableInfo.append(line).append("+").append("\n");
-      tableInfo.append(valueLine).append("|").append("\n");
-      tableInfo.append(line).append("+").append("\n");
-      return "Print query statistic for each task id:" + "\n" + 
tableInfo.toString();
+      return new TaskStatistics(queryId, taskId).build(startTime, 
queryStatistics);
     } catch (Exception ex) {
       LOGGER.error(ex);
-      return "Put statistics into table failed, catch exception: " + 
ex.getMessage();
+      return null;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/stats/TaskStatistics.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/stats/TaskStatistics.java 
b/core/src/main/java/org/apache/carbondata/core/stats/TaskStatistics.java
new file mode 100644
index 0000000..7e4d89f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/stats/TaskStatistics.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.stats;
+
+import java.io.Serializable;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.util.CarbonUtil;
+
[email protected]("stats")
[email protected]
+public class TaskStatistics implements Serializable {
+
+  private static final Column[] columns = {
+      new Column("query_id", "query id"),
+      new Column("task_id", "spark task id"),
+      new Column("start_time", "start time"),
+      new Column("total_time", QueryStatisticsConstants.EXECUTOR_PART, true),
+      new Column("load_blocks_time", 
QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, true),
+      new Column("load_dictionary_time", 
QueryStatisticsConstants.LOAD_DICTIONARY, true),
+      new Column("carbon_scan_time", 
QueryStatisticsConstants.SCAN_BLOCKlET_TIME, true),
+      new Column("carbon_IO_time", 
QueryStatisticsConstants.READ_BLOCKlET_TIME, true),
+      new Column("scan_blocks_num", QueryStatisticsConstants.SCAN_BLOCKS_NUM),
+      new Column("total_blocklets", 
QueryStatisticsConstants.TOTAL_BLOCKLET_NUM),
+      new Column("valid_blocklets", 
QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM),
+      new Column("total_pages", QueryStatisticsConstants.TOTAL_PAGE_SCANNED),
+      new Column("scanned_pages", QueryStatisticsConstants.PAGE_SCANNED),
+      new Column("valid_pages", QueryStatisticsConstants.VALID_PAGE_SCANNED),
+      new Column("result_size", QueryStatisticsConstants.RESULT_SIZE)
+  };
+
+  private static final int numOfColumns = columns.length;
+
+  private String queryId;
+
+  private long[] values = new long[numOfColumns];
+
+  private long fileSize;
+
+  private String[] files;
+
+  TaskStatistics(String queryId, long taskId) {
+    this.queryId = queryId;
+    this.values[1] = taskId;
+  }
+
+  public TaskStatistics(String queryId, long[] values, long fileSize, String[] 
files) {
+    this.queryId = queryId;
+    this.values = values;
+    this.fileSize = fileSize;
+    this.files = files;
+  }
+
+  public TaskStatistics build(long startTime, List<QueryStatistic> 
queryStatistics) {
+    this.values[2] = startTime;
+    for (QueryStatistic statistic : queryStatistics) {
+      if (statistic.getMessage() != null) {
+        for (int columnIndex = 3; columnIndex <= numOfColumns - 1; 
columnIndex++) {
+          if (columns[columnIndex].comment.equals(statistic.getMessage())) {
+            if (columns[columnIndex].isDuration) {
+              values[columnIndex] += statistic.getTimeTaken();
+            } else {
+              values[columnIndex] += statistic.getCount();
+            }
+            break;
+          }
+        }
+      }
+    }
+    return this;
+  }
+
+  @Override public String toString() {
+    StringBuilder builder = new StringBuilder();
+    printStatisticTable(Arrays.asList(this), builder, "");
+    return builder.toString();
+  }
+
+  public static void printStatisticTable(List<TaskStatistics> stats, 
StringBuilder builder,
+      String indent) {
+    int numOfRows = stats.size();
+    int numOfColumns = columns.length;
+
+    // header as string[]
+    String[] header = new String[numOfColumns];
+    for (int columnIndex = 0; columnIndex < numOfColumns; columnIndex++) {
+      header[columnIndex] = columns[columnIndex].name;
+    }
+
+    // convert rows to string[][]
+    String[][] rows = new String[numOfRows][];
+    for (int rowIndex = 0; rowIndex < numOfRows; rowIndex++) {
+      rows[rowIndex] = stats.get(rowIndex).convertValueToString();
+    }
+
+    CarbonUtil.logTable(builder, header, rows, indent);
+  }
+
+  private String[] convertValueToString() {
+    String[] valueStrings = new String[numOfColumns];
+    valueStrings[0] = queryId;
+    for (int i = 1; i < numOfColumns; i++) {
+      if (columns[i].isDuration) {
+        valueStrings[i] = String.valueOf(values[i]) + "ms";
+      } else {
+        valueStrings[i] = String.valueOf(values[i]);
+      }
+    }
+    valueStrings[2] = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS").format(values[2]);
+    return valueStrings;
+  }
+
+  private static class Column {
+    String name;
+    String comment;
+    boolean isDuration;
+
+    Column(String name, String comment) {
+      this.name = name;
+      this.comment = comment;
+      this.isDuration = false;
+    }
+
+    Column(String name, String comment, boolean isDuration) {
+      this(name, comment);
+      this.isDuration = isDuration;
+    }
+  }
+
+  public String getQueryId() {
+    return queryId;
+  }
+
+  public long[] getValues() {
+    return values;
+  }
+
+  public long getFileSize() {
+    return fileSize;
+  }
+
+  public String[] getFiles() {
+    return files;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 5ac359b..4d8caed 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -226,6 +226,7 @@ public final class CarbonProperties {
     validateSortMemorySizeInMB();
     validateWorkingMemory();
     validateSortStorageMemory();
+    validateEnableQueryStatistics();
   }
 
   /**
@@ -1365,6 +1366,27 @@ public final class CarbonProperties {
         unsafeSortStorageMemory + "");
   }
 
+  private void validateEnableQueryStatistics() {
+    String enableQueryStatistics = carbonProperties.getProperty(
+        CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
+        CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT);
+    boolean isValidBooleanValue = 
CarbonUtil.validateBoolean(enableQueryStatistics);
+    if (!isValidBooleanValue) {
+      LOGGER.warn("The enable query statistics value \"" + 
enableQueryStatistics
+          + "\" is invalid. Using the default value \""
+          + CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT);
+      
carbonProperties.setProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
+          CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT);
+    }
+  }
+
+  public boolean isEnableQueryStatistics() {
+    String enableQueryStatistics = carbonProperties.getProperty(
+        CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
+        CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT);
+    return enableQueryStatistics.equalsIgnoreCase("true");
+  }
+
   /**
    * Get the heap memory pooling threshold bytes.
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 84d3c6c..1082d78 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1428,6 +1428,107 @@ public final class CarbonUtil {
   }
 
   /**
+   * append a string with left pad to the string builder
+   */
+  public static void leftPad(StringBuilder builder, String a, int length, char 
pad) {
+    if (builder == null || a == null) {
+      return;
+    }
+    int padLength = length - a.length();
+    if (padLength > 0) {
+      for (int i = 0; i < padLength; i++) {
+        builder.append(pad);
+      }
+    }
+    if (a.length() > 0) {
+      builder.append(a);
+    }
+  }
+
+  /**
+   * append a string with right pad to the string builder
+   */
+  public static void rightPad(StringBuilder builder, String a, int length, 
char pad) {
+    if (builder == null || a == null) {
+      return;
+    }
+    int padLength = length - a.length();
+    if (a.length() > 0) {
+      builder.append(a);
+    }
+    if (padLength > 0) {
+      for (int i = 0; i < padLength; i++) {
+        builder.append(pad);
+      }
+    }
+  }
+
+  /**
+   * log information as table
+   */
+  public static void logTable(StringBuilder builder, String[] header, 
String[][] rows,
+      String indent) {
+    int numOfRows = rows.length;
+    int numOfColumns = header.length;
+
+    // calculate max length of each column
+    int[] maxLengths = new int[numOfColumns];
+    for (int columnIndex = 0; columnIndex < numOfColumns; columnIndex++) {
+      maxLengths[columnIndex] = header[columnIndex].length();
+    }
+    for (int rowIndex = 0; rowIndex < numOfRows; rowIndex++) {
+      for (int columnIndex = 0; columnIndex < numOfColumns; columnIndex++) {
+        maxLengths[columnIndex] =
+            Math.max(maxLengths[columnIndex], 
rows[rowIndex][columnIndex].length());
+      }
+    }
+
+    // build line
+    StringBuilder line = new StringBuilder("+");
+    for (int columnIndex = 0; columnIndex < numOfColumns; columnIndex++) {
+      CarbonUtil.leftPad(line, "", maxLengths[columnIndex], '-');
+      line.append("+");
+    }
+
+    // append head
+    
builder.append(indent).append(line).append("\n").append(indent).append("|");
+    for (int columnIndex = 0; columnIndex < numOfColumns; columnIndex++) {
+      CarbonUtil.rightPad(builder, header[columnIndex], 
maxLengths[columnIndex], ' ');
+      builder.append("|");
+    }
+    builder.append("\n").append(indent).append(line);
+
+    // append rows
+    for (int rowIndex = 0; rowIndex < numOfRows; rowIndex++) {
+      builder.append("\n").append(indent).append("|");
+      for (int columnIndex = 0; columnIndex < numOfColumns; columnIndex++) {
+        CarbonUtil.leftPad(builder, rows[rowIndex][columnIndex], 
maxLengths[columnIndex], ' ');
+        builder.append("|");
+      }
+      builder.append("\n").append(indent).append(line);
+    }
+  }
+
+  public static void logTable(StringBuilder builder, String context, String 
indent) {
+    String[] rows = context.split("\n");
+    int maxLength = 0;
+    for (String row: rows) {
+      maxLength = Math.max(maxLength, row.length());
+    }
+    StringBuilder line = new StringBuilder("+");
+    CarbonUtil.rightPad(line, "", maxLength, '-');
+    line.append("+");
+
+    builder.append(indent).append(line);
+    for (String row: rows) {
+      builder.append("\n").append(indent).append("|");
+      CarbonUtil.rightPad(builder, row, maxLength, ' ');
+      builder.append("|");
+    }
+    builder.append("\n").append(indent).append(line);
+  }
+
+  /**
    * Below method will be used to get the list of values in
    * comma separated string format
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/test/java/org/apache/carbondata/core/stats/QueryStasticsRecorderImplTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/stats/QueryStasticsRecorderImplTest.java
 
b/core/src/test/java/org/apache/carbondata/core/stats/QueryStasticsRecorderImplTest.java
index 39195b3..cfbe7a7 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/stats/QueryStasticsRecorderImplTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/stats/QueryStasticsRecorderImplTest.java
@@ -98,11 +98,10 @@ public class QueryStasticsRecorderImplTest {
         .addStatistics(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM, 5L);
     queryStasticsRecorderImpl.logStatistics();
     
queryStasticsRecorderImpl.recordStatistics(queryStatisticWithVALID_SCAN_BLOCKLET_NUM);
-    queryStasticsRecorderImpl.logStatisticsAsTableExecutor();
   }
 
   @Test public void testcollectExecutorStatistics() {
-    assertNotNull(queryStasticsRecorderImpl.collectExecutorStatistics());
+    assertNotNull(queryStasticsRecorderImpl.statisticsForTask(1, 1));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/examples/spark2/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/resources/log4j.properties 
b/examples/spark2/src/main/resources/log4j.properties
new file mode 100644
index 0000000..4db4f7a
--- /dev/null
+++ b/examples/spark2/src/main/resources/log4j.properties
@@ -0,0 +1,7 @@
+log4j.logger.org.apache.spark.sql.profiler.ProfilerLogger$=INFO,F1
+log4j.appender.F1=org.apache.log4j.RollingFileAppender
+log4j.appender.F1.File=${path.target}/profiler.log
+log4j.appender.F1.MaxFileSize=4024KB
+log4j.appender.F1.MaxBackupIndex=20
+log4j.appender.F1.layout=org.apache.log4j.PatternLayout
+log4j.appender.F1.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p %c 
- %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
index 7a15327..d97a9f0 100644
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -19,11 +19,27 @@ package org.apache.carbondata.examples
 
 import java.io.File
 
+import org.apache.log4j.PropertyConfigurator
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
 object CarbonSessionExample {
 
   def main(args: Array[String]) {
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    System.setProperty("path.target", s"$rootPath/examples/spark2/target")
+    // print profiler log to a separated file: target/profiler.log
+    PropertyConfigurator.configure(
+      s"$rootPath/examples/spark2/src/main/resources/log4j.properties")
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
+
     val spark = ExampleUtils.createCarbonSession("CarbonSessionExample")
-    spark.sparkContext.setLogLevel("WARN")
+    spark.sparkContext.setLogLevel("INFO")
 
     spark.sql("DROP TABLE IF EXISTS carbon_table")
 
@@ -40,15 +56,12 @@ object CarbonSessionExample {
          | decimalField DECIMAL(18,2),
          | dateField DATE,
          | charField CHAR(5),
-         | floatField FLOAT,
-         | complexData ARRAY<STRING>
+         | floatField FLOAT
          | )
          | STORED BY 'carbondata'
-         | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, 
charField')
+         | TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField')
        """.stripMargin)
 
-    val rootPath = new File(this.getClass.getResource("/").getPath
-                            + "../../../..").getCanonicalPath
     val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
 
     // scalastyle:off

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
index 7c06b60..448cf28 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
@@ -88,7 +88,7 @@ public class CarbonMultiBlockSplit extends InputSplit 
implements Writable {
     this.length = length;
   }
 
-  private void calculateLength() {
+  public void calculateLength() {
     long total = 0;
     if (splitList.size() > 0 && splitList.get(0).getDetailInfo() != null) {
       Map<String, Long> blockSizes = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index c0366d2..c057129 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -158,6 +158,7 @@ public class CarbonFileInputFormat<T> extends 
CarbonInputFormat<T> implements Se
       List<Segment> validSegments, BitSet matchedPartitions, PartitionInfo 
partitionInfo,
       List<Integer> oldPartitionIdList) throws IOException {
 
+    numSegments = validSegments.size();
     List<InputSplit> result = new LinkedList<InputSplit>();
     UpdateVO invalidBlockVOForSegmentId = null;
     Boolean isIUDTable = false;
@@ -173,6 +174,7 @@ public class CarbonFileInputFormat<T> extends 
CarbonInputFormat<T> implements Se
     List<CarbonInputSplit> dataBlocksOfSegment =
         getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, 
matchedPartitions,
             validSegments, partitionInfo, oldPartitionIdList);
+    numBlocks = dataBlocksOfSegment.size();
     for (CarbonInputSplit inputSplit : dataBlocksOfSegment) {
 
       // Get the UpdateVO for those tables on which IUD operations being 
performed.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 5506af6..8d2318b 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -107,6 +107,23 @@ public abstract class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
   private static final String PARTITIONS_TO_PRUNE =
       "mapreduce.input.carboninputformat.partitions.to.prune";
 
+  // record segment number and hit blocks
+  protected int numSegments = 0;
+  protected int numStreamSegments = 0;
+  protected int numBlocks = 0;
+
+  public int getNumSegments() {
+    return numSegments;
+  }
+
+  public int getNumStreamSegments() {
+    return numStreamSegments;
+  }
+
+  public int getNumBlocks() {
+    return numBlocks;
+  }
+
   /**
    * Set the `tableInfo` in `configuration`
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 290b3d7..14f4c71 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -284,7 +284,7 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
       List<Segment> streamSegments) throws IOException {
     List<InputSplit> splits = new ArrayList<InputSplit>();
     if (streamSegments != null && !streamSegments.isEmpty()) {
-
+      numStreamSegments = streamSegments.size();
       long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
       long maxSize = getMaxSplitSize(job);
       for (Segment segment : streamSegments) {
@@ -459,6 +459,7 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
       List<Integer> oldPartitionIdList, SegmentUpdateStatusManager 
updateStatusManager)
       throws IOException {
 
+    numSegments = validSegments.size();
     List<InputSplit> result = new LinkedList<InputSplit>();
     UpdateVO invalidBlockVOForSegmentId = null;
     Boolean isIUDTable = false;
@@ -472,6 +473,7 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
     List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment =
         getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, 
matchedPartitions,
             validSegments, partitionInfo, oldPartitionIdList);
+    numBlocks = dataBlocksOfSegment.size();
     for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : 
dataBlocksOfSegment) {
 
       // Get the UpdateVO for those tables on which IUD operations being 
performed.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/integration/spark-common-test/src/test/scala/org/apache/spark/sql/profiler/ProfilerSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/spark/sql/profiler/ProfilerSuite.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/spark/sql/profiler/ProfilerSuite.scala
new file mode 100644
index 0000000..998e7a9
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/spark/sql/profiler/ProfilerSuite.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.profiler
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class ProfilerSuite extends QueryTest with BeforeAndAfterAll {
+  var setupEndpointRef: RpcEndpointRef = _
+  var statementMessages: ArrayBuffer[ProfilerMessage] = _
+  var executionMessages: ArrayBuffer[ProfilerMessage] = _
+  val profilerEndPoint = new ProfilerEndPoint
+  val listener = new ProfilerListener
+
+  override def beforeAll(): Unit = {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
+      "true"
+    )
+    Profiler.setIsEnable(true)
+    setupEndpointRef = SparkEnv.get.rpcEnv.setupEndpoint(
+      "CarbonProfiler",
+      new ProfilerEndPoint() {
+        // override method for testing
+        override def processSQLStart(statementId: Long,
+            messages: ArrayBuffer[ProfilerMessage]): Unit = {
+          statementMessages = messages
+        }
+        // override method for testing
+        override def processExecutionEnd(executionId: Long,
+            messages: ArrayBuffer[ProfilerMessage]): Unit = {
+          executionMessages = messages
+        }
+      })
+    sqlContext.sparkContext.addSparkListener(listener)
+  }
+
+  def cleanMessages(): Unit = {
+    statementMessages = null
+    executionMessages = null
+  }
+
+  def processSQLStart(statementId: Long,
+      messages: ArrayBuffer[ProfilerMessage]): Unit = {
+    try {
+      profilerEndPoint.processSQLStart(statementId, messages)
+    } catch {
+      case _ =>
+        assert(false, "Failed to log StatementSummary")
+    }
+  }
+
+  def processExecutionEnd(executionId: Long,
+      messages: ArrayBuffer[ProfilerMessage]): Unit = {
+    try {
+      profilerEndPoint.processExecutionEnd(executionId, messages)
+    } catch {
+      case _ =>
+        assert(false, "Failed to log ExecutionSummary")
+    }
+  }
+
+  def checkCommand(sqlText: String): Unit = {
+    sql(sqlText)
+    Thread.sleep(1000)
+    assertResult(1)(statementMessages.length)
+    assert(statementMessages(0).isInstanceOf[SQLStart])
+    val dropSQLStart = statementMessages(0).asInstanceOf[SQLStart]
+    assertResult(sqlText)(dropSQLStart.sqlText)
+    processSQLStart(dropSQLStart.statementId, statementMessages)
+    cleanMessages()
+  }
+
+  def checkSelectQuery(
+      sqlText: String,
+      executionMsgCount: Int
+  ): Unit = {
+    sql(sqlText).collect()
+    Thread.sleep(1000)
+    assertResult(2)(statementMessages.length)
+    assert(statementMessages(0).isInstanceOf[SQLStart])
+    assert(statementMessages(1).isInstanceOf[Optimizer])
+    val sqlStart = statementMessages(0).asInstanceOf[SQLStart]
+    assertResult(sqlText)(sqlStart.sqlText)
+    processSQLStart(sqlStart.statementId, statementMessages)
+    assertResult(executionMsgCount)(executionMessages.length)
+    assert(executionMessages(0).isInstanceOf[ExecutionStart])
+    assert(executionMessages.exists(_.isInstanceOf[GetPartition]))
+    assert(executionMessages.exists(_.isInstanceOf[QueryTaskEnd]))
+    assert(executionMessages(executionMsgCount - 1).isInstanceOf[ExecutionEnd])
+    val executionStart = executionMessages(0).asInstanceOf[ExecutionStart]
+    processExecutionEnd(executionStart.executionId, executionMessages)
+    cleanMessages()
+  }
+
+  test("collect messages to driver side") {
+    // drop table
+    checkCommand("DROP TABLE IF EXISTS mobile")
+    checkCommand("DROP TABLE IF EXISTS emp")
+    // create table
+    checkCommand("CREATE TABLE mobile (mid string,mobileId string, color 
string, id int) STORED BY 'carbondata' 
TBLPROPERTIES('DICTIONARY_EXCLUDE'='Color')")
+    checkCommand("CREATE TABLE emp (eid string,eName string, mobileId 
string,color string, id int) STORED BY 'carbondata' 
TBLPROPERTIES('DICTIONARY_EXCLUDE'='Color')")
+    // load data
+    checkCommand(s"LOAD DATA LOCAL INPATH '$resourcesPath/join/mobile.csv' 
INTO TABLE mobile OPTIONS('FILEHEADER'='mid,mobileId,color,id')")
+    checkCommand(s"LOAD DATA LOCAL INPATH '$resourcesPath/join/employee.csv' 
INTO TABLE emp OPTIONS('FILEHEADER'='eid,eName,mobileId,color,id')")
+    // select query
+    checkSelectQuery("SELECT * FROM mobile where mid = 'mid89'", 4)
+    checkSelectQuery("SELECT * FROM emp where eid = 'empid292'", 4)
+    checkSelectQuery("SELECT * FROM emp JOIN mobile ON 
emp.mobileId=mobile.mobileId", 6)
+    // drop table
+    checkCommand("DROP TABLE IF EXISTS mobile")
+    checkCommand("DROP TABLE IF EXISTS emp")
+  }
+
+  override def afterAll(): Unit = {
+    SparkEnv.get.rpcEnv.stop(setupEndpointRef)
+    sqlContext.sparkContext.listenerBus.removeListener(listener)
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
+      CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT
+    )
+    Profiler.setIsEnable(false)
+    sql("DROP TABLE IF EXISTS mobile")
+    sql("DROP TABLE IF EXISTS emp")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 9d2b6e5..2be0efc 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -35,6 +35,8 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.profiler.{GetPartition, Profiler, QueryTaskEnd}
 import org.apache.spark.sql.util.SparkSQLUtil.sessionState
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -86,65 +88,111 @@ class CarbonScanRDD(
   @transient val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getName)
 
   override def getPartitions: Array[Partition] = {
-    val conf = new Configuration()
-    val jobConf = new JobConf(conf)
-    SparkHadoopUtil.get.addCredentials(jobConf)
-    val job = Job.getInstance(jobConf)
-    val fileLevelExternal = 
tableInfo.getFactTable().getTableProperties().get("_filelevelformat")
-    val format = if (fileLevelExternal != null && 
fileLevelExternal.equalsIgnoreCase("true")) {
-      prepareFileInputFormatForDriver(job.getConfiguration)
-    } else {
-      prepareInputFormatForDriver(job.getConfiguration)
-    }
-    // initialise query_id for job
-    job.getConfiguration.set("query.id", queryId)
-
-    // get splits
-    val splits = format.getSplits(job)
-    if ((splits == null) && 
format.isInstanceOf[CarbonFileInputFormat[Object]]) {
-      throw new SparkException(
-        "CarbonData file not exist in the segment_null (SDK writer Output) 
path")
-    }
+    val startTime = System.currentTimeMillis()
+    var partitions: Array[Partition] = Array.empty[Partition]
+    var getSplitsStartTime: Long = -1
+    var getSplitsEndTime: Long = -1
+    var distributeStartTime: Long = -1
+    var distributeEndTime: Long = -1
+    val tablePath = tableInfo.getOrCreateAbsoluteTableIdentifier().getTablePath
+    var numSegments = 0
+    var numStreamSegments = 0
+    var numBlocks = 0
 
-    // separate split
-    // 1. for batch splits, invoke distributeSplits method to create partitions
-    // 2. for stream splits, create partition for each split by default
-    val columnarSplits = new ArrayList[InputSplit]()
-    val streamSplits = new ArrayBuffer[InputSplit]()
-    splits.asScala.foreach { split =>
-      val carbonInputSplit = split.asInstanceOf[CarbonInputSplit]
-      if (FileFormat.ROW_V1 == carbonInputSplit.getFileFormat) {
-        streamSplits += split
+    try {
+      val conf = new Configuration()
+      val jobConf = new JobConf(conf)
+      SparkHadoopUtil.get.addCredentials(jobConf)
+      val job = Job.getInstance(jobConf)
+      val fileLevelExternal = 
tableInfo.getFactTable().getTableProperties().get("_filelevelformat")
+      val format = if (fileLevelExternal != null && 
fileLevelExternal.equalsIgnoreCase("true")) {
+        prepareFileInputFormatForDriver(job.getConfiguration)
       } else {
-        columnarSplits.add(split)
+        prepareInputFormatForDriver(job.getConfiguration)
       }
-    }
-    val batchPartitions = distributeColumnarSplits(columnarSplits)
-    // check and remove InExpression from filterExpression
-    checkAndRemoveInExpressinFromFilterExpression(batchPartitions)
-    if (streamSplits.isEmpty) {
-      batchPartitions.toArray
-    } else {
-      val index = batchPartitions.length
-      val streamPartitions: mutable.Buffer[Partition] =
-        streamSplits.zipWithIndex.map { splitWithIndex =>
-          val multiBlockSplit =
-            new CarbonMultiBlockSplit(
-              Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
-              splitWithIndex._1.getLocations,
-              FileFormat.ROW_V1)
-          new CarbonSparkPartition(id, splitWithIndex._2 + index, 
multiBlockSplit)
+      // initialise query_id for job
+      job.getConfiguration.set("query.id", queryId)
+
+      // get splits
+      getSplitsStartTime = System.currentTimeMillis()
+      val splits = format.getSplits(job)
+      getSplitsEndTime = System.currentTimeMillis()
+      if ((splits == null) && 
format.isInstanceOf[CarbonFileInputFormat[Object]]) {
+        throw new SparkException(
+          "CarbonData file not exist in the segment_null (SDK writer Output) 
path")
+      }
+      numSegments = format.getNumSegments
+      numStreamSegments = format.getNumStreamSegments
+      numBlocks = format.getNumBlocks
+
+      // separate split
+      // 1. for batch splits, invoke distributeSplits method to create 
partitions
+      // 2. for stream splits, create partition for each split by default
+      val columnarSplits = new ArrayList[InputSplit]()
+      val streamSplits = new ArrayBuffer[InputSplit]()
+      splits.asScala.foreach { split =>
+        val carbonInputSplit = split.asInstanceOf[CarbonInputSplit]
+        if (FileFormat.ROW_V1 == carbonInputSplit.getFileFormat) {
+          streamSplits += split
+        } else {
+          columnarSplits.add(split)
         }
-      if (batchPartitions.isEmpty) {
-        streamPartitions.toArray
+      }
+      distributeStartTime = System.currentTimeMillis()
+      val batchPartitions = distributeColumnarSplits(columnarSplits)
+      distributeEndTime = System.currentTimeMillis()
+      // check and remove InExpression from filterExpression
+      checkAndRemoveInExpressinFromFilterExpression(batchPartitions)
+      if (streamSplits.isEmpty) {
+        partitions = batchPartitions.toArray
       } else {
-        logInfo(
-          s"""
-             | Identified no.of Streaming Blocks: ${streamPartitions.size},
+        val index = batchPartitions.length
+        val streamPartitions: mutable.Buffer[Partition] =
+          streamSplits.zipWithIndex.map { splitWithIndex =>
+            val multiBlockSplit =
+              new CarbonMultiBlockSplit(
+                Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
+                splitWithIndex._1.getLocations,
+                FileFormat.ROW_V1)
+            new CarbonSparkPartition(id, splitWithIndex._2 + index, 
multiBlockSplit)
+          }
+        if (batchPartitions.isEmpty) {
+          partitions = streamPartitions.toArray
+        } else {
+          logInfo(
+            s"""
+               | Identified no.of Streaming Blocks: ${ streamPartitions.size },
           """.stripMargin)
-        // should keep the order by index of partition
-        batchPartitions.appendAll(streamPartitions)
-        batchPartitions.toArray
+          // should keep the order by index of partition
+          batchPartitions.appendAll(streamPartitions)
+          partitions = batchPartitions.toArray
+        }
+      }
+      partitions
+    } finally {
+      Profiler.invokeIfEnable {
+        val endTime = System.currentTimeMillis()
+        val executionId = 
spark.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY).toLong
+        Profiler.send(
+          GetPartition(
+            executionId,
+            tableInfo.getDatabaseName + "." + 
tableInfo.getFactTable.getTableName,
+            tablePath,
+            queryId,
+            partitions.length,
+            startTime,
+            endTime,
+            getSplitsStartTime,
+            getSplitsEndTime,
+            numSegments,
+            numStreamSegments,
+            numBlocks,
+            distributeStartTime,
+            distributeEndTime,
+            if (filterExpression == null) "" else 
filterExpression.getStatement,
+            if (columnProjection == null) "" else 
columnProjection.getAllColumns.mkString(",")
+          )
+        )
       }
     }
   }
@@ -340,7 +388,8 @@ class CarbonScanRDD(
         System.getProperty("user.dir") + '/' + "conf" + '/' + 
"carbon.properties"
       )
     }
-
+    val executionId = context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+    val taskId = split.index
     val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, 
split.index, 0)
     val attemptContext = new TaskAttemptContextImpl(new Configuration(), 
attemptId)
     val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
@@ -349,6 +398,8 @@ class CarbonScanRDD(
     inputMetricsStats.initBytesReadCallback(context, inputSplit)
     val iterator = if (inputSplit.getAllSplits.size() > 0) {
       val model = format.createQueryModel(inputSplit, attemptContext)
+      // one query id per table
+      model.setQueryId(queryId)
       // get RecordReader by FileFormat
       val reader: RecordReader[Void, Object] = inputSplit.getFileFormat match {
         case FileFormat.ROW_V1 =>
@@ -388,7 +439,7 @@ class CarbonScanRDD(
       context.addTaskCompletionListener { _ =>
         reader.close()
         close()
-        logStatistics(queryStartTime, model.getStatisticsRecorder)
+        logStatistics(executionId, taskId, queryStartTime, 
model.getStatisticsRecorder, split)
       }
       // initialize the reader
       reader.initialize(inputSplit, attemptContext)
@@ -516,14 +567,40 @@ class CarbonScanRDD(
     format
   }
 
-  def logStatistics(queryStartTime: Long, recorder: QueryStatisticsRecorder): 
Unit = {
+  def logStatistics(
+      executionId: String,
+      taskId: Long,
+      queryStartTime: Long,
+      recorder: QueryStatisticsRecorder,
+      split: Partition
+  ): Unit = {
     if (null != recorder) {
       val queryStatistic = new QueryStatistic()
       
queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
         System.currentTimeMillis - queryStartTime)
       recorder.recordStatistics(queryStatistic)
       // print executor query statistics for each task_id
-      recorder.logStatisticsAsTableExecutor()
+      val statistics = recorder.statisticsForTask(taskId, queryStartTime)
+      if (statistics != null) {
+        Profiler.invokeIfEnable {
+          val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
+          inputSplit.calculateLength()
+          val size = inputSplit.getLength
+          val files = inputSplit.getAllSplits.asScala.map { s =>
+            s.getSegmentId + "/" + s.getPath.getName
+          }.toArray[String]
+          Profiler.send(
+            QueryTaskEnd(
+              executionId.toLong,
+              queryId,
+              statistics.getValues,
+              size,
+              files
+            )
+          )
+        }
+      }
+      recorder.logStatisticsForTask(statistics)
     }
   }
 
@@ -531,7 +608,6 @@ class CarbonScanRDD(
    * This method will check and remove InExpression from filterExpression to 
prevent the List
    * Expression values from serializing and deserializing on executor
    *
-   * @param format
    * @param identifiedPartitions
    */
   private def checkAndRemoveInExpressinFromFilterExpression(

Reply via email to