[CARBONDATA-2271] Collect SQL execution information to driver side This PR add support for collecting SQL execution information for profiling purpose. See CarbonSessionExample.scala, it will generate a separated log file containing profiling information
This closes #2087 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d5bec4dd Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d5bec4dd Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d5bec4dd Branch: refs/heads/master Commit: d5bec4dd7eb9b40fe3f6618093ba28e886e56b25 Parents: e58ca9f Author: QiangCai <[email protected]> Authored: Mon Mar 19 19:49:47 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Sat Mar 24 15:50:03 2018 +0800 ---------------------------------------------------------------------- .../core/scan/expression/BinaryExpression.java | 3 +- .../core/scan/expression/ColumnExpression.java | 17 +- .../core/scan/expression/Expression.java | 1 + .../core/scan/expression/LiteralExpression.java | 17 +- .../core/scan/expression/MatchExpression.java | 5 + .../conditional/EqualToExpression.java | 17 +- .../GreaterThanEqualToExpression.java | 11 +- .../conditional/GreaterThanExpression.java | 13 +- .../expression/conditional/InExpression.java | 13 +- .../conditional/LessThanEqualToExpression.java | 10 +- .../conditional/LessThanExpression.java | 10 +- .../expression/conditional/ListExpression.java | 24 +- .../conditional/NotEqualsExpression.java | 14 +- .../expression/conditional/NotInExpression.java | 13 +- .../conditional/StartsWithExpression.java | 13 +- .../scan/expression/logical/AndExpression.java | 13 +- .../expression/logical/FalseExpression.java | 14 +- .../scan/expression/logical/OrExpression.java | 13 +- .../expression/logical/RangeExpression.java | 17 +- .../scan/expression/logical/TrueExpression.java | 14 +- .../carbondata/core/scan/model/QueryModel.java | 4 + .../DriverQueryStatisticsRecorderDummy.java | 8 +- .../DriverQueryStatisticsRecorderImpl.java | 6 +- .../carbondata/core/stats/QueryStatistic.java | 4 + .../core/stats/QueryStatisticsRecorder.java | 4 +- .../stats/QueryStatisticsRecorderDummy.java | 9 +- .../core/stats/QueryStatisticsRecorderImpl.java | 119 +----- .../carbondata/core/stats/TaskStatistics.java | 164 +++++++++ .../carbondata/core/util/CarbonProperties.java | 22 ++ .../apache/carbondata/core/util/CarbonUtil.java | 101 +++++ .../stats/QueryStasticsRecorderImplTest.java | 3 +- .../spark2/src/main/resources/log4j.properties | 7 + .../examples/CarbonSessionExample.scala | 25 +- .../hadoop/CarbonMultiBlockSplit.java | 2 +- .../hadoop/api/CarbonFileInputFormat.java | 2 + .../hadoop/api/CarbonInputFormat.java | 17 + .../hadoop/api/CarbonTableInputFormat.java | 4 +- .../spark/sql/profiler/ProfilerSuite.scala | 149 ++++++++ .../carbondata/spark/rdd/CarbonScanRDD.scala | 192 +++++++--- .../apache/spark/sql/profiler/Profiler.scala | 250 +++++++++++++ .../spark/sql/profiler/ProfilerListener.scala | 49 +++ .../spark/sql/profiler/ProfilerLogger.scala | 368 +++++++++++++++++++ .../org/apache/spark/sql/CarbonSession.scala | 46 ++- .../spark/sql/SparkUnknownExpression.scala | 4 + .../sql/optimizer/CarbonLateDecodeRule.scala | 11 +- 45 files changed, 1581 insertions(+), 241 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/BinaryExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/BinaryExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/BinaryExpression.java index 0e73410..0a86b5c 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/BinaryExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/BinaryExpression.java @@ -38,7 +38,8 @@ public abstract class BinaryExpression extends Expression { return right; } - @Override public void findAndSetChild(Expression oldExpr, Expression newExpr) { + @Override + public void findAndSetChild(Expression oldExpr, Expression newExpr) { for (int i = 0; i < children.size(); i++) { if (oldExpr.equals(children.get(i))) { if (this.left.equals(children.get(i))) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java index 5cd2d34..39ad312 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/ColumnExpression.java @@ -98,20 +98,28 @@ public class ColumnExpression extends LeafExpression { return dataType; } - @Override public ExpressionResult evaluate(RowIntf value) { + @Override + public ExpressionResult evaluate(RowIntf value) { return new ExpressionResult(dataType, (null == value ? null : value.getVal(colIndex))); } - @Override public ExpressionType getFilterExpressionType() { + @Override + public ExpressionType getFilterExpressionType() { // TODO Auto-generated method stub return null; } - @Override public String getString() { + @Override + public String getString() { // TODO Auto-generated method stub return "ColumnExpression(" + columnName + ')'; } + @Override + public String getStatement() { + return columnName; + } + public CarbonColumn getCarbonColumn() { return carbonColumn; } @@ -120,7 +128,8 @@ public class ColumnExpression extends LeafExpression { this.carbonColumn = carbonColumn; } - @Override public void findAndSetChild(Expression oldExpr, Expression newExpr) { + @Override + public void findAndSetChild(Expression oldExpr, Expression newExpr) { } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/Expression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/Expression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/Expression.java index da0df05..13acc63 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/Expression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/Expression.java @@ -51,4 +51,5 @@ public abstract class Expression implements Serializable { public abstract String getString(); + public abstract String getStatement(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/LiteralExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/LiteralExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/LiteralExpression.java index b194e3c..14accc1 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/LiteralExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/LiteralExpression.java @@ -36,7 +36,8 @@ public class LiteralExpression extends LeafExpression { this.dataType = dataType; } - @Override public ExpressionResult evaluate(RowIntf value) { + @Override + public ExpressionResult evaluate(RowIntf value) { return new ExpressionResult(dataType, this.value, true); } @@ -44,16 +45,23 @@ public class LiteralExpression extends LeafExpression { return new ExpressionResult(dataType, this.value, true); } - @Override public ExpressionType getFilterExpressionType() { + @Override + public ExpressionType getFilterExpressionType() { // TODO Auto-generated method stub return ExpressionType.LITERAL; } - @Override public String getString() { + @Override + public String getString() { // TODO Auto-generated method stub return "LiteralExpression(" + value + ')'; } + @Override + public String getStatement() { + return value == null ? null : value.toString(); + } + /** * getLiteralExpDataType. * @@ -67,7 +75,8 @@ public class LiteralExpression extends LeafExpression { return value; } - @Override public void findAndSetChild(Expression oldExpr, Expression newExpr) { + @Override + public void findAndSetChild(Expression oldExpr, Expression newExpr) { } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/MatchExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/MatchExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/MatchExpression.java index 3677b51..3ab2885 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/MatchExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/MatchExpression.java @@ -54,4 +54,9 @@ public class MatchExpression extends Expression { public String getString() { return queryString; } + + @Override + public String getStatement() { + return queryString; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpression.java index faf5bb1..0588f2c 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpression.java @@ -40,7 +40,8 @@ public class EqualToExpression extends BinaryConditionalExpression { this.isNull = isNull; } - @Override public ExpressionResult evaluate(RowIntf value) + @Override + public ExpressionResult evaluate(RowIntf value) throws FilterUnsupportedException, FilterIllegalMemberException { ExpressionResult elRes = left.evaluate(value); ExpressionResult erRes = right.evaluate(value); @@ -91,12 +92,22 @@ public class EqualToExpression extends BinaryConditionalExpression { return val1; } - @Override public ExpressionType getFilterExpressionType() { + @Override + public ExpressionType getFilterExpressionType() { return ExpressionType.EQUALS; } - @Override public String getString() { + @Override + public String getString() { return "EqualTo(" + left.getString() + ',' + right.getString() + ')'; } + @Override + public String getStatement() { + if (isNull) { + return left.getStatement() + " is " + right.getStatement(); + } else { + return left.getStatement() + " = " + right.getStatement(); + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpression.java index 24575d2..c15ae62 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpression.java @@ -74,11 +74,18 @@ public class GreaterThanEqualToExpression extends BinaryConditionalExpression { return exprResVal1; } - @Override public ExpressionType getFilterExpressionType() { + @Override + public ExpressionType getFilterExpressionType() { return ExpressionType.GREATERTHAN_EQUALTO; } - @Override public String getString() { + @Override + public String getString() { return "GreaterThanEqualTo(" + left.getString() + ',' + right.getString() + ')'; } + + @Override + public String getStatement() { + return left.getStatement() + " >= " + right.getStatement(); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpression.java index ddc3d30..aecd782 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpression.java @@ -34,7 +34,8 @@ public class GreaterThanExpression extends BinaryConditionalExpression { super(left, right); } - @Override public ExpressionResult evaluate(RowIntf value) + @Override + public ExpressionResult evaluate(RowIntf value) throws FilterUnsupportedException, FilterIllegalMemberException { ExpressionResult exprLeftRes = left.evaluate(value); ExpressionResult exprRightRes = right.evaluate(value); @@ -76,12 +77,18 @@ public class GreaterThanExpression extends BinaryConditionalExpression { return val1; } - @Override public ExpressionType getFilterExpressionType() { + @Override + public ExpressionType getFilterExpressionType() { return ExpressionType.GREATERTHAN; } - @Override public String getString() { + @Override + public String getString() { return "GreaterThan(" + left.getString() + ',' + right.getString() + ')'; } + @Override + public String getStatement() { + return left.getStatement() + " > " + right.getStatement(); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/InExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/InExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/InExpression.java index a560cc3..11d538e 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/InExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/InExpression.java @@ -38,7 +38,8 @@ public class InExpression extends BinaryConditionalExpression { super(left, right); } - @Override public ExpressionResult evaluate(RowIntf value) + @Override + public ExpressionResult evaluate(RowIntf value) throws FilterUnsupportedException, FilterIllegalMemberException { ExpressionResult leftRsult = left.evaluate(value); @@ -93,12 +94,18 @@ public class InExpression extends BinaryConditionalExpression { return leftRsult; } - @Override public ExpressionType getFilterExpressionType() { + @Override + public ExpressionType getFilterExpressionType() { return ExpressionType.IN; } - @Override public String getString() { + @Override + public String getString() { return "IN(" + left.getString() + ',' + right.getString() + ')'; } + @Override + public String getStatement() { + return left.getStatement() + " in " + right.getStatement(); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpression.java index df7d791..6fee008 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpression.java @@ -74,13 +74,19 @@ public class LessThanEqualToExpression extends BinaryConditionalExpression { return exprResValue1; } - @Override public ExpressionType getFilterExpressionType() { + @Override + public ExpressionType getFilterExpressionType() { // TODO Auto-generated method stub return ExpressionType.LESSTHAN_EQUALTO; } - @Override public String getString() { + @Override + public String getString() { return "LessThanEqualTo(" + left.getString() + ',' + right.getString() + ')'; } + @Override + public String getStatement() { + return left.getStatement() + " <= " + right.getStatement(); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpression.java index f4b7f7c..4e4c240 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpression.java @@ -78,12 +78,18 @@ public class LessThanExpression extends BinaryConditionalExpression { return val1; } - @Override public ExpressionType getFilterExpressionType() { + @Override + public ExpressionType getFilterExpressionType() { return ExpressionType.LESSTHAN; } - @Override public String getString() { + @Override + public String getString() { return "LessThan(" + left.getString() + ',' + right.getString() + ')'; } + @Override + public String getStatement() { + return left.getStatement() + " < " + right.getStatement(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ListExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ListExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ListExpression.java index 1c00b14..32b5028 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ListExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ListExpression.java @@ -34,7 +34,8 @@ public class ListExpression extends Expression { this.children = children; } - @Override public ExpressionResult evaluate(RowIntf value) throws FilterUnsupportedException { + @Override + public ExpressionResult evaluate(RowIntf value) throws FilterUnsupportedException { List<ExpressionResult> listOfExprRes = new ArrayList<ExpressionResult>(10); for (Expression expr : children) { @@ -47,11 +48,13 @@ public class ListExpression extends Expression { return new ExpressionResult(listOfExprRes); } - @Override public ExpressionType getFilterExpressionType() { + @Override + public ExpressionType getFilterExpressionType() { return ExpressionType.LIST; } - @Override public String getString() { + @Override + public String getString() { StringBuffer value = new StringBuffer(); value.append("ListExpression("); for (Expression expr : children) { @@ -62,6 +65,19 @@ public class ListExpression extends Expression { return value.toString(); } - @Override public void findAndSetChild(Expression oldExpr, Expression newExpr) { + @Override + public String getStatement() { + StringBuffer value = new StringBuffer(); + value.append("("); + for (Expression expr : children) { + value.append(expr.getString()).append(";"); + } + value.append(')'); + + return value.toString(); + } + + @Override + public void findAndSetChild(Expression oldExpr, Expression newExpr) { } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java index 5046722..eae8019 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java @@ -39,7 +39,8 @@ public class NotEqualsExpression extends BinaryConditionalExpression { super(left, right); } - @Override public ExpressionResult evaluate(RowIntf value) + @Override + public ExpressionResult evaluate(RowIntf value) throws FilterUnsupportedException, FilterIllegalMemberException { ExpressionResult elRes = left.evaluate(value); ExpressionResult erRes = right.evaluate(value); @@ -87,11 +88,18 @@ public class NotEqualsExpression extends BinaryConditionalExpression { return val1; } - @Override public ExpressionType getFilterExpressionType() { + @Override + public ExpressionType getFilterExpressionType() { return ExpressionType.NOT_EQUALS; } - @Override public String getString() { + @Override + public String getString() { return "NotEquals(" + left.getString() + ',' + right.getString() + ')'; } + + @Override + public String getStatement() { + return left.getStatement() + " <> " + right.getStatement(); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpression.java index c4a2fc8..cbc2995 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpression.java @@ -38,7 +38,8 @@ public class NotInExpression extends BinaryConditionalExpression { super(left, right); } - @Override public ExpressionResult evaluate(RowIntf value) + @Override + public ExpressionResult evaluate(RowIntf value) throws FilterUnsupportedException, FilterIllegalMemberException { // Both left and right result need to be checked for null because NotInExpression is basically @@ -104,12 +105,18 @@ public class NotInExpression extends BinaryConditionalExpression { return leftRsult; } - @Override public ExpressionType getFilterExpressionType() { + @Override + public ExpressionType getFilterExpressionType() { return ExpressionType.NOT_IN; } - @Override public String getString() { + @Override + public String getString() { return "NOT IN(" + left.getString() + ',' + right.getString() + ')'; } + @Override + public String getStatement() { + return left.getStatement() + " not in " + right.getStatement(); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/StartsWithExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/StartsWithExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/StartsWithExpression.java index 18c7374..cabcaf2 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/StartsWithExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/StartsWithExpression.java @@ -33,7 +33,8 @@ public class StartsWithExpression extends BinaryConditionalExpression { super(left, right); } - @Override public ExpressionResult evaluate(RowIntf value) + @Override + public ExpressionResult evaluate(RowIntf value) throws FilterUnsupportedException, FilterIllegalMemberException { ExpressionResult exprLeftRes = left.evaluate(value); ExpressionResult exprRightRes = right.evaluate(value); @@ -61,12 +62,18 @@ public class StartsWithExpression extends BinaryConditionalExpression { return val1; } - @Override public ExpressionType getFilterExpressionType() { + @Override + public ExpressionType getFilterExpressionType() { return ExpressionType.STARTSWITH; } - @Override public String getString() { + @Override + public String getString() { return "StartsWith(" + left.getString() + ',' + right.getString() + ')'; } + @Override + public String getStatement() { + return left.getStatement() + " like '" + right.getStatement() + "%'"; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/AndExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/AndExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/AndExpression.java index 4a3508b..aa01998 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/AndExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/AndExpression.java @@ -33,7 +33,8 @@ public class AndExpression extends BinaryLogicalExpression { super(left, right); } - @Override public ExpressionResult evaluate(RowIntf value) + @Override + public ExpressionResult evaluate(RowIntf value) throws FilterUnsupportedException, FilterIllegalMemberException { ExpressionResult resultLeft = left.evaluate(value); ExpressionResult resultRight = right.evaluate(value); @@ -46,14 +47,20 @@ public class AndExpression extends BinaryLogicalExpression { return resultLeft; } - @Override public ExpressionType getFilterExpressionType() { + @Override + public ExpressionType getFilterExpressionType() { // TODO Auto-generated method stub return ExpressionType.AND; } - @Override public String getString() { + @Override + public String getString() { // TODO Auto-generated method stub return "And(" + left.getString() + ',' + right.getString() + ')'; } + @Override + public String getStatement() { + return "(" + left.getStatement() + " and " + right.getStatement() + ")"; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/FalseExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/FalseExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/FalseExpression.java index f62980b..5e8010c 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/FalseExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/FalseExpression.java @@ -50,7 +50,8 @@ public class FalseExpression extends BinaryConditionalExpression { * @throws FilterUnsupportedException * @throws FilterIllegalMemberException */ - @Override public ExpressionResult evaluate(RowIntf value) + @Override + public ExpressionResult evaluate(RowIntf value) throws FilterUnsupportedException, FilterIllegalMemberException { return new ExpressionResult(DataTypes.BOOLEAN,false); } @@ -59,10 +60,17 @@ public class FalseExpression extends BinaryConditionalExpression { * This method will return the expression types * @return */ - @Override public ExpressionType getFilterExpressionType() { + @Override + public ExpressionType getFilterExpressionType() { return ExpressionType.FALSE; } - @Override public String getString() { + @Override + public String getString() { return null; } + + @Override + public String getStatement() { + return "false"; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/OrExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/OrExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/OrExpression.java index bd10f10..148081a 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/OrExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/OrExpression.java @@ -33,7 +33,8 @@ public class OrExpression extends BinaryLogicalExpression { super(left, right); } - @Override public ExpressionResult evaluate(RowIntf value) + @Override + public ExpressionResult evaluate(RowIntf value) throws FilterIllegalMemberException, FilterUnsupportedException { ExpressionResult resultLeft = left.evaluate(value); ExpressionResult resultRight = right.evaluate(value); @@ -46,12 +47,18 @@ public class OrExpression extends BinaryLogicalExpression { return resultLeft; } - @Override public ExpressionType getFilterExpressionType() { + @Override + public ExpressionType getFilterExpressionType() { return ExpressionType.OR; } - @Override public String getString() { + @Override + public String getString() { return "Or(" + left.getString() + ',' + right.getString() + ')'; } + @Override + public String getStatement() { + return "(" + left.getString() + " or " + right.getString() + ")"; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/RangeExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/RangeExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/RangeExpression.java index 01e3270..5d09fb1 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/RangeExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/RangeExpression.java @@ -40,7 +40,8 @@ public class RangeExpression extends BinaryConditionalExpression { super(left, right); } - @Override public ExpressionResult evaluate(RowIntf value) + @Override + public ExpressionResult evaluate(RowIntf value) throws FilterUnsupportedException, FilterIllegalMemberException { ExpressionResult resultLeft = left.evaluate(value); ExpressionResult resultRight = right.evaluate(value); @@ -53,15 +54,23 @@ public class RangeExpression extends BinaryConditionalExpression { return resultLeft; } - @Override public ExpressionType getFilterExpressionType() { + @Override + public ExpressionType getFilterExpressionType() { return ExpressionType.RANGE; } - @Override public String getString() { + @Override + public String getString() { return null; } - @Override public List<ExpressionResult> getLiterals() { + @Override + public String getStatement() { + return left.getStatement() + " between " + right.getStatement(); + } + + @Override + public List<ExpressionResult> getLiterals() { List<ExpressionResult> listOfExp = new ArrayList<ExpressionResult>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); getLiteralsResult(this, listOfExp); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/TrueExpression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/TrueExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/TrueExpression.java index 39db1d2..3948589 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/TrueExpression.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/logical/TrueExpression.java @@ -48,7 +48,8 @@ public class TrueExpression extends BinaryConditionalExpression { * @throws FilterUnsupportedException * @throws FilterIllegalMemberException */ - @Override public ExpressionResult evaluate(RowIntf value) + @Override + public ExpressionResult evaluate(RowIntf value) throws FilterUnsupportedException, FilterIllegalMemberException { return new ExpressionResult(DataTypes.BOOLEAN,true); } @@ -57,10 +58,17 @@ public class TrueExpression extends BinaryConditionalExpression { * This method will return the expression types * @return */ - @Override public ExpressionType getFilterExpressionType() { + @Override + public ExpressionType getFilterExpressionType() { return ExpressionType.TRUE; } - @Override public String getString() { + @Override + public String getString() { return null; } + + @Override + public String getStatement() { + return "true"; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java index 4e3665f..47a6bdd 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java @@ -227,6 +227,10 @@ public class QueryModel { return queryId; } + public void setQueryId(String queryId) { + this.queryId = queryId; + } + /** * @return the tableBlockInfos */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderDummy.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderDummy.java b/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderDummy.java index 09b0862..130bc43 100644 --- a/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderDummy.java +++ b/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderDummy.java @@ -40,14 +40,16 @@ public class DriverQueryStatisticsRecorderDummy implements QueryStatisticsRecord } - public void logStatisticsAsTableExecutor() { + public TaskStatistics statisticsForTask(long taskId, long startTime) { + return null; + } + + public void logStatisticsForTask(TaskStatistics task) { } /** * Below method will be used to add the statistics - * - * @param statistic */ public synchronized void recordStatisticsForDriver(QueryStatistic statistic, String queryId) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderImpl.java b/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderImpl.java index b38632f..20536fb 100644 --- a/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/stats/DriverQueryStatisticsRecorderImpl.java @@ -67,7 +67,11 @@ public class DriverQueryStatisticsRecorderImpl implements QueryStatisticsRecorde } - public void logStatisticsAsTableExecutor() { + public TaskStatistics statisticsForTask(long taskId, long startTime) { + return null; + } + + public void logStatisticsForTask(TaskStatistics task) { } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/stats/QueryStatistic.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatistic.java b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatistic.java index b97b137..5b0bd6a 100644 --- a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatistic.java +++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatistic.java @@ -109,4 +109,8 @@ public class QueryStatistic implements Serializable { return this.count; } + public long getStartTime() { + return this.startTime; + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorder.java b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorder.java index a52f70b..69bd485 100644 --- a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorder.java +++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorder.java @@ -25,7 +25,9 @@ public interface QueryStatisticsRecorder { void logStatistics(); - void logStatisticsAsTableExecutor(); + TaskStatistics statisticsForTask(long taskId, long startTime); + + void logStatisticsForTask(TaskStatistics task); void recordStatisticsForDriver(QueryStatistic statistic, String queryId); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderDummy.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderDummy.java b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderDummy.java index 57a5f5b..e312f53 100644 --- a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderDummy.java +++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderDummy.java @@ -48,10 +48,11 @@ public class QueryStatisticsRecorderDummy implements QueryStatisticsRecorder,Ser } - /** - * Below method will be used to show statistic log as table - */ - public void logStatisticsAsTableExecutor() { + public TaskStatistics statisticsForTask(long taskId, long startTime) { + return null; + } + + public void logStatisticsForTask(TaskStatistics task) { } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java index 9807eaa..78ebd09 100644 --- a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java @@ -23,8 +23,6 @@ import java.util.List; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import static org.apache.carbondata.core.util.CarbonUtil.printLine; - /** * Class will be used to record and log the query statistics */ @@ -46,13 +44,13 @@ public class QueryStatisticsRecorderImpl implements QueryStatisticsRecorder, Ser private List<QueryStatistic> queryStatistics; /** - * query with taskd + * query id with task */ - private String queryIWthTask; + private String queryId; public QueryStatisticsRecorderImpl(String queryId) { queryStatistics = new ArrayList<QueryStatistic>(); - this.queryIWthTask = queryId; + this.queryId = queryId; } /** @@ -69,122 +67,25 @@ public class QueryStatisticsRecorderImpl implements QueryStatisticsRecorder, Ser */ public void logStatistics() { for (QueryStatistic statistic : queryStatistics) { - LOGGER.statistic(statistic.getStatistics(queryIWthTask)); + LOGGER.statistic(statistic.getStatistics(queryId)); } } /** * Below method will be used to show statistic log as table */ - public void logStatisticsAsTableExecutor() { - String tableInfo = collectExecutorStatistics(); - if (null != tableInfo) { - LOGGER.statistic(tableInfo); + public void logStatisticsForTask(TaskStatistics result) { + if (null != result) { + LOGGER.statistic("Print query statistic for each task id:" + "\n" + result.toString()); } } - /** - * Below method will parse queryStatisticsMap and put time into table - */ - public String collectExecutorStatistics() { - long load_blocks_time = 0; - long scan_blocks_time = 0; - long scan_blocks_num = 0; - long load_dictionary_time = 0; - long result_size = 0; - long total_executor_time = 0; - long total_blocklet = 0; - long valid_scan_blocklet = 0; - long valid_pages_blocklet = 0; - long total_pages = 0; - long readTime = 0; - long scannedPages = 0; + public TaskStatistics statisticsForTask(long taskId, long startTime) { try { - for (QueryStatistic statistic : queryStatistics) { - if (statistic.getMessage() != null) { - switch (statistic.getMessage()) { - case QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR: - load_blocks_time += statistic.getTimeTaken(); - break; - case QueryStatisticsConstants.SCAN_BLOCKlET_TIME: - scan_blocks_time += statistic.getCount(); - break; - case QueryStatisticsConstants.SCAN_BLOCKS_NUM: - scan_blocks_num += statistic.getCount(); - break; - case QueryStatisticsConstants.LOAD_DICTIONARY: - load_dictionary_time += statistic.getTimeTaken(); - break; - case QueryStatisticsConstants.RESULT_SIZE: - result_size += statistic.getCount(); - break; - case QueryStatisticsConstants.EXECUTOR_PART: - total_executor_time += statistic.getTimeTaken(); - break; - case QueryStatisticsConstants.TOTAL_BLOCKLET_NUM: - total_blocklet = statistic.getCount(); - break; - case QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM: - valid_scan_blocklet = statistic.getCount(); - break; - case QueryStatisticsConstants.VALID_PAGE_SCANNED: - valid_pages_blocklet = statistic.getCount(); - break; - case QueryStatisticsConstants.TOTAL_PAGE_SCANNED: - total_pages = statistic.getCount(); - break; - case QueryStatisticsConstants.READ_BLOCKlET_TIME: - readTime = statistic.getCount(); - break; - case QueryStatisticsConstants.PAGE_SCANNED: - scannedPages = statistic.getCount(); - break; - default: - break; - } - } - } - String headers = - "task_id,load_blocks_time,load_dictionary_time,carbon_scan_time,carbon_IO_time, " - + "total_executor_time,scan_blocks_num,total_blocklets," - + "valid_blocklets,total_pages,scanned_pages,valid_pages,result_size"; - List<String> values = new ArrayList<String>(); - values.add(queryIWthTask); - values.add(load_blocks_time + "ms"); - values.add(load_dictionary_time + "ms"); - values.add(scan_blocks_time + "ms"); - values.add(readTime + "ms"); - values.add(total_executor_time + "ms"); - values.add(String.valueOf(scan_blocks_num)); - values.add(String.valueOf(total_blocklet)); - values.add(String.valueOf(valid_scan_blocklet)); - values.add(String.valueOf(total_pages)); - values.add(String.valueOf(scannedPages)); - values.add(String.valueOf(valid_pages_blocklet)); - values.add(String.valueOf(result_size)); - StringBuilder tableInfo = new StringBuilder(); - String[] columns = headers.split(","); - StringBuilder line = new StringBuilder(""); - StringBuilder hearLine = new StringBuilder(""); - StringBuilder valueLine = new StringBuilder(""); - for (int i = 0; i < columns.length; i++) { - int len = Math.max(columns[i].length(), values.get(i).length()); - line.append("+").append(printLine("-", len)); - hearLine.append("|").append(printLine(" ", len - columns[i].length())) - .append(columns[i]); - valueLine.append("|").append(printLine(" ", len - values.get(i).length())) - .append(values.get(i)); - } - // struct table info - tableInfo.append(line).append("+").append("\n"); - tableInfo.append(hearLine).append("|").append("\n"); - tableInfo.append(line).append("+").append("\n"); - tableInfo.append(valueLine).append("|").append("\n"); - tableInfo.append(line).append("+").append("\n"); - return "Print query statistic for each task id:" + "\n" + tableInfo.toString(); + return new TaskStatistics(queryId, taskId).build(startTime, queryStatistics); } catch (Exception ex) { LOGGER.error(ex); - return "Put statistics into table failed, catch exception: " + ex.getMessage(); + return null; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/stats/TaskStatistics.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/stats/TaskStatistics.java b/core/src/main/java/org/apache/carbondata/core/stats/TaskStatistics.java new file mode 100644 index 0000000..7e4d89f --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/stats/TaskStatistics.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.stats; + +import java.io.Serializable; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.util.CarbonUtil; + [email protected]("stats") [email protected] +public class TaskStatistics implements Serializable { + + private static final Column[] columns = { + new Column("query_id", "query id"), + new Column("task_id", "spark task id"), + new Column("start_time", "start time"), + new Column("total_time", QueryStatisticsConstants.EXECUTOR_PART, true), + new Column("load_blocks_time", QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, true), + new Column("load_dictionary_time", QueryStatisticsConstants.LOAD_DICTIONARY, true), + new Column("carbon_scan_time", QueryStatisticsConstants.SCAN_BLOCKlET_TIME, true), + new Column("carbon_IO_time", QueryStatisticsConstants.READ_BLOCKlET_TIME, true), + new Column("scan_blocks_num", QueryStatisticsConstants.SCAN_BLOCKS_NUM), + new Column("total_blocklets", QueryStatisticsConstants.TOTAL_BLOCKLET_NUM), + new Column("valid_blocklets", QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM), + new Column("total_pages", QueryStatisticsConstants.TOTAL_PAGE_SCANNED), + new Column("scanned_pages", QueryStatisticsConstants.PAGE_SCANNED), + new Column("valid_pages", QueryStatisticsConstants.VALID_PAGE_SCANNED), + new Column("result_size", QueryStatisticsConstants.RESULT_SIZE) + }; + + private static final int numOfColumns = columns.length; + + private String queryId; + + private long[] values = new long[numOfColumns]; + + private long fileSize; + + private String[] files; + + TaskStatistics(String queryId, long taskId) { + this.queryId = queryId; + this.values[1] = taskId; + } + + public TaskStatistics(String queryId, long[] values, long fileSize, String[] files) { + this.queryId = queryId; + this.values = values; + this.fileSize = fileSize; + this.files = files; + } + + public TaskStatistics build(long startTime, List<QueryStatistic> queryStatistics) { + this.values[2] = startTime; + for (QueryStatistic statistic : queryStatistics) { + if (statistic.getMessage() != null) { + for (int columnIndex = 3; columnIndex <= numOfColumns - 1; columnIndex++) { + if (columns[columnIndex].comment.equals(statistic.getMessage())) { + if (columns[columnIndex].isDuration) { + values[columnIndex] += statistic.getTimeTaken(); + } else { + values[columnIndex] += statistic.getCount(); + } + break; + } + } + } + } + return this; + } + + @Override public String toString() { + StringBuilder builder = new StringBuilder(); + printStatisticTable(Arrays.asList(this), builder, ""); + return builder.toString(); + } + + public static void printStatisticTable(List<TaskStatistics> stats, StringBuilder builder, + String indent) { + int numOfRows = stats.size(); + int numOfColumns = columns.length; + + // header as string[] + String[] header = new String[numOfColumns]; + for (int columnIndex = 0; columnIndex < numOfColumns; columnIndex++) { + header[columnIndex] = columns[columnIndex].name; + } + + // convert rows to string[][] + String[][] rows = new String[numOfRows][]; + for (int rowIndex = 0; rowIndex < numOfRows; rowIndex++) { + rows[rowIndex] = stats.get(rowIndex).convertValueToString(); + } + + CarbonUtil.logTable(builder, header, rows, indent); + } + + private String[] convertValueToString() { + String[] valueStrings = new String[numOfColumns]; + valueStrings[0] = queryId; + for (int i = 1; i < numOfColumns; i++) { + if (columns[i].isDuration) { + valueStrings[i] = String.valueOf(values[i]) + "ms"; + } else { + valueStrings[i] = String.valueOf(values[i]); + } + } + valueStrings[2] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(values[2]); + return valueStrings; + } + + private static class Column { + String name; + String comment; + boolean isDuration; + + Column(String name, String comment) { + this.name = name; + this.comment = comment; + this.isDuration = false; + } + + Column(String name, String comment, boolean isDuration) { + this(name, comment); + this.isDuration = isDuration; + } + } + + public String getQueryId() { + return queryId; + } + + public long[] getValues() { + return values; + } + + public long getFileSize() { + return fileSize; + } + + public String[] getFiles() { + return files; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 5ac359b..4d8caed 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -226,6 +226,7 @@ public final class CarbonProperties { validateSortMemorySizeInMB(); validateWorkingMemory(); validateSortStorageMemory(); + validateEnableQueryStatistics(); } /** @@ -1365,6 +1366,27 @@ public final class CarbonProperties { unsafeSortStorageMemory + ""); } + private void validateEnableQueryStatistics() { + String enableQueryStatistics = carbonProperties.getProperty( + CarbonCommonConstants.ENABLE_QUERY_STATISTICS, + CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT); + boolean isValidBooleanValue = CarbonUtil.validateBoolean(enableQueryStatistics); + if (!isValidBooleanValue) { + LOGGER.warn("The enable query statistics value \"" + enableQueryStatistics + + "\" is invalid. Using the default value \"" + + CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT); + carbonProperties.setProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, + CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT); + } + } + + public boolean isEnableQueryStatistics() { + String enableQueryStatistics = carbonProperties.getProperty( + CarbonCommonConstants.ENABLE_QUERY_STATISTICS, + CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT); + return enableQueryStatistics.equalsIgnoreCase("true"); + } + /** * Get the heap memory pooling threshold bytes. */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 84d3c6c..1082d78 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -1428,6 +1428,107 @@ public final class CarbonUtil { } /** + * append a string with left pad to the string builder + */ + public static void leftPad(StringBuilder builder, String a, int length, char pad) { + if (builder == null || a == null) { + return; + } + int padLength = length - a.length(); + if (padLength > 0) { + for (int i = 0; i < padLength; i++) { + builder.append(pad); + } + } + if (a.length() > 0) { + builder.append(a); + } + } + + /** + * append a string with right pad to the string builder + */ + public static void rightPad(StringBuilder builder, String a, int length, char pad) { + if (builder == null || a == null) { + return; + } + int padLength = length - a.length(); + if (a.length() > 0) { + builder.append(a); + } + if (padLength > 0) { + for (int i = 0; i < padLength; i++) { + builder.append(pad); + } + } + } + + /** + * log information as table + */ + public static void logTable(StringBuilder builder, String[] header, String[][] rows, + String indent) { + int numOfRows = rows.length; + int numOfColumns = header.length; + + // calculate max length of each column + int[] maxLengths = new int[numOfColumns]; + for (int columnIndex = 0; columnIndex < numOfColumns; columnIndex++) { + maxLengths[columnIndex] = header[columnIndex].length(); + } + for (int rowIndex = 0; rowIndex < numOfRows; rowIndex++) { + for (int columnIndex = 0; columnIndex < numOfColumns; columnIndex++) { + maxLengths[columnIndex] = + Math.max(maxLengths[columnIndex], rows[rowIndex][columnIndex].length()); + } + } + + // build line + StringBuilder line = new StringBuilder("+"); + for (int columnIndex = 0; columnIndex < numOfColumns; columnIndex++) { + CarbonUtil.leftPad(line, "", maxLengths[columnIndex], '-'); + line.append("+"); + } + + // append head + builder.append(indent).append(line).append("\n").append(indent).append("|"); + for (int columnIndex = 0; columnIndex < numOfColumns; columnIndex++) { + CarbonUtil.rightPad(builder, header[columnIndex], maxLengths[columnIndex], ' '); + builder.append("|"); + } + builder.append("\n").append(indent).append(line); + + // append rows + for (int rowIndex = 0; rowIndex < numOfRows; rowIndex++) { + builder.append("\n").append(indent).append("|"); + for (int columnIndex = 0; columnIndex < numOfColumns; columnIndex++) { + CarbonUtil.leftPad(builder, rows[rowIndex][columnIndex], maxLengths[columnIndex], ' '); + builder.append("|"); + } + builder.append("\n").append(indent).append(line); + } + } + + public static void logTable(StringBuilder builder, String context, String indent) { + String[] rows = context.split("\n"); + int maxLength = 0; + for (String row: rows) { + maxLength = Math.max(maxLength, row.length()); + } + StringBuilder line = new StringBuilder("+"); + CarbonUtil.rightPad(line, "", maxLength, '-'); + line.append("+"); + + builder.append(indent).append(line); + for (String row: rows) { + builder.append("\n").append(indent).append("|"); + CarbonUtil.rightPad(builder, row, maxLength, ' '); + builder.append("|"); + } + builder.append("\n").append(indent).append(line); + } + + /** * Below method will be used to get the list of values in * comma separated string format * http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/core/src/test/java/org/apache/carbondata/core/stats/QueryStasticsRecorderImplTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/stats/QueryStasticsRecorderImplTest.java b/core/src/test/java/org/apache/carbondata/core/stats/QueryStasticsRecorderImplTest.java index 39195b3..cfbe7a7 100644 --- a/core/src/test/java/org/apache/carbondata/core/stats/QueryStasticsRecorderImplTest.java +++ b/core/src/test/java/org/apache/carbondata/core/stats/QueryStasticsRecorderImplTest.java @@ -98,11 +98,10 @@ public class QueryStasticsRecorderImplTest { .addStatistics(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM, 5L); queryStasticsRecorderImpl.logStatistics(); queryStasticsRecorderImpl.recordStatistics(queryStatisticWithVALID_SCAN_BLOCKLET_NUM); - queryStasticsRecorderImpl.logStatisticsAsTableExecutor(); } @Test public void testcollectExecutorStatistics() { - assertNotNull(queryStasticsRecorderImpl.collectExecutorStatistics()); + assertNotNull(queryStasticsRecorderImpl.statisticsForTask(1, 1)); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/examples/spark2/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/resources/log4j.properties b/examples/spark2/src/main/resources/log4j.properties new file mode 100644 index 0000000..4db4f7a --- /dev/null +++ b/examples/spark2/src/main/resources/log4j.properties @@ -0,0 +1,7 @@ +log4j.logger.org.apache.spark.sql.profiler.ProfilerLogger$=INFO,F1 +log4j.appender.F1=org.apache.log4j.RollingFileAppender +log4j.appender.F1.File=${path.target}/profiler.log +log4j.appender.F1.MaxFileSize=4024KB +log4j.appender.F1.MaxBackupIndex=20 +log4j.appender.F1.layout=org.apache.log4j.PatternLayout +log4j.appender.F1.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p %c - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala index 7a15327..d97a9f0 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala @@ -19,11 +19,27 @@ package org.apache.carbondata.examples import java.io.File +import org.apache.log4j.PropertyConfigurator + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + object CarbonSessionExample { def main(args: Array[String]) { + + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + System.setProperty("path.target", s"$rootPath/examples/spark2/target") + // print profiler log to a separated file: target/profiler.log + PropertyConfigurator.configure( + s"$rootPath/examples/spark2/src/main/resources/log4j.properties") + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true") + val spark = ExampleUtils.createCarbonSession("CarbonSessionExample") - spark.sparkContext.setLogLevel("WARN") + spark.sparkContext.setLogLevel("INFO") spark.sql("DROP TABLE IF EXISTS carbon_table") @@ -40,15 +56,12 @@ object CarbonSessionExample { | decimalField DECIMAL(18,2), | dateField DATE, | charField CHAR(5), - | floatField FLOAT, - | complexData ARRAY<STRING> + | floatField FLOAT | ) | STORED BY 'carbondata' - | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField') + | TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField') """.stripMargin) - val rootPath = new File(this.getClass.getResource("/").getPath - + "../../../..").getCanonicalPath val path = s"$rootPath/examples/spark2/src/main/resources/data.csv" // scalastyle:off http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java index 7c06b60..448cf28 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java @@ -88,7 +88,7 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable { this.length = length; } - private void calculateLength() { + public void calculateLength() { long total = 0; if (splitList.size() > 0 && splitList.get(0).getDetailInfo() != null) { Map<String, Long> blockSizes = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java index c0366d2..c057129 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java @@ -158,6 +158,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se List<Segment> validSegments, BitSet matchedPartitions, PartitionInfo partitionInfo, List<Integer> oldPartitionIdList) throws IOException { + numSegments = validSegments.size(); List<InputSplit> result = new LinkedList<InputSplit>(); UpdateVO invalidBlockVOForSegmentId = null; Boolean isIUDTable = false; @@ -173,6 +174,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se List<CarbonInputSplit> dataBlocksOfSegment = getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions, validSegments, partitionInfo, oldPartitionIdList); + numBlocks = dataBlocksOfSegment.size(); for (CarbonInputSplit inputSplit : dataBlocksOfSegment) { // Get the UpdateVO for those tables on which IUD operations being performed. http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index 5506af6..8d2318b 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -107,6 +107,23 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { private static final String PARTITIONS_TO_PRUNE = "mapreduce.input.carboninputformat.partitions.to.prune"; + // record segment number and hit blocks + protected int numSegments = 0; + protected int numStreamSegments = 0; + protected int numBlocks = 0; + + public int getNumSegments() { + return numSegments; + } + + public int getNumStreamSegments() { + return numStreamSegments; + } + + public int getNumBlocks() { + return numBlocks; + } + /** * Set the `tableInfo` in `configuration` */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 290b3d7..14f4c71 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -284,7 +284,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { List<Segment> streamSegments) throws IOException { List<InputSplit> splits = new ArrayList<InputSplit>(); if (streamSegments != null && !streamSegments.isEmpty()) { - + numStreamSegments = streamSegments.size(); long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); for (Segment segment : streamSegments) { @@ -459,6 +459,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { List<Integer> oldPartitionIdList, SegmentUpdateStatusManager updateStatusManager) throws IOException { + numSegments = validSegments.size(); List<InputSplit> result = new LinkedList<InputSplit>(); UpdateVO invalidBlockVOForSegmentId = null; Boolean isIUDTable = false; @@ -472,6 +473,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment = getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions, validSegments, partitionInfo, oldPartitionIdList); + numBlocks = dataBlocksOfSegment.size(); for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) { // Get the UpdateVO for those tables on which IUD operations being performed. http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/integration/spark-common-test/src/test/scala/org/apache/spark/sql/profiler/ProfilerSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/spark/sql/profiler/ProfilerSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/spark/sql/profiler/ProfilerSuite.scala new file mode 100644 index 0000000..998e7a9 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/spark/sql/profiler/ProfilerSuite.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.profiler + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.SparkEnv +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class ProfilerSuite extends QueryTest with BeforeAndAfterAll { + var setupEndpointRef: RpcEndpointRef = _ + var statementMessages: ArrayBuffer[ProfilerMessage] = _ + var executionMessages: ArrayBuffer[ProfilerMessage] = _ + val profilerEndPoint = new ProfilerEndPoint + val listener = new ProfilerListener + + override def beforeAll(): Unit = { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.ENABLE_QUERY_STATISTICS, + "true" + ) + Profiler.setIsEnable(true) + setupEndpointRef = SparkEnv.get.rpcEnv.setupEndpoint( + "CarbonProfiler", + new ProfilerEndPoint() { + // override method for testing + override def processSQLStart(statementId: Long, + messages: ArrayBuffer[ProfilerMessage]): Unit = { + statementMessages = messages + } + // override method for testing + override def processExecutionEnd(executionId: Long, + messages: ArrayBuffer[ProfilerMessage]): Unit = { + executionMessages = messages + } + }) + sqlContext.sparkContext.addSparkListener(listener) + } + + def cleanMessages(): Unit = { + statementMessages = null + executionMessages = null + } + + def processSQLStart(statementId: Long, + messages: ArrayBuffer[ProfilerMessage]): Unit = { + try { + profilerEndPoint.processSQLStart(statementId, messages) + } catch { + case _ => + assert(false, "Failed to log StatementSummary") + } + } + + def processExecutionEnd(executionId: Long, + messages: ArrayBuffer[ProfilerMessage]): Unit = { + try { + profilerEndPoint.processExecutionEnd(executionId, messages) + } catch { + case _ => + assert(false, "Failed to log ExecutionSummary") + } + } + + def checkCommand(sqlText: String): Unit = { + sql(sqlText) + Thread.sleep(1000) + assertResult(1)(statementMessages.length) + assert(statementMessages(0).isInstanceOf[SQLStart]) + val dropSQLStart = statementMessages(0).asInstanceOf[SQLStart] + assertResult(sqlText)(dropSQLStart.sqlText) + processSQLStart(dropSQLStart.statementId, statementMessages) + cleanMessages() + } + + def checkSelectQuery( + sqlText: String, + executionMsgCount: Int + ): Unit = { + sql(sqlText).collect() + Thread.sleep(1000) + assertResult(2)(statementMessages.length) + assert(statementMessages(0).isInstanceOf[SQLStart]) + assert(statementMessages(1).isInstanceOf[Optimizer]) + val sqlStart = statementMessages(0).asInstanceOf[SQLStart] + assertResult(sqlText)(sqlStart.sqlText) + processSQLStart(sqlStart.statementId, statementMessages) + assertResult(executionMsgCount)(executionMessages.length) + assert(executionMessages(0).isInstanceOf[ExecutionStart]) + assert(executionMessages.exists(_.isInstanceOf[GetPartition])) + assert(executionMessages.exists(_.isInstanceOf[QueryTaskEnd])) + assert(executionMessages(executionMsgCount - 1).isInstanceOf[ExecutionEnd]) + val executionStart = executionMessages(0).asInstanceOf[ExecutionStart] + processExecutionEnd(executionStart.executionId, executionMessages) + cleanMessages() + } + + test("collect messages to driver side") { + // drop table + checkCommand("DROP TABLE IF EXISTS mobile") + checkCommand("DROP TABLE IF EXISTS emp") + // create table + checkCommand("CREATE TABLE mobile (mid string,mobileId string, color string, id int) STORED BY 'carbondata' TBLPROPERTIES('DICTIONARY_EXCLUDE'='Color')") + checkCommand("CREATE TABLE emp (eid string,eName string, mobileId string,color string, id int) STORED BY 'carbondata' TBLPROPERTIES('DICTIONARY_EXCLUDE'='Color')") + // load data + checkCommand(s"LOAD DATA LOCAL INPATH '$resourcesPath/join/mobile.csv' INTO TABLE mobile OPTIONS('FILEHEADER'='mid,mobileId,color,id')") + checkCommand(s"LOAD DATA LOCAL INPATH '$resourcesPath/join/employee.csv' INTO TABLE emp OPTIONS('FILEHEADER'='eid,eName,mobileId,color,id')") + // select query + checkSelectQuery("SELECT * FROM mobile where mid = 'mid89'", 4) + checkSelectQuery("SELECT * FROM emp where eid = 'empid292'", 4) + checkSelectQuery("SELECT * FROM emp JOIN mobile ON emp.mobileId=mobile.mobileId", 6) + // drop table + checkCommand("DROP TABLE IF EXISTS mobile") + checkCommand("DROP TABLE IF EXISTS emp") + } + + override def afterAll(): Unit = { + SparkEnv.get.rpcEnv.stop(setupEndpointRef) + sqlContext.sparkContext.listenerBus.removeListener(listener) + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.ENABLE_QUERY_STATISTICS, + CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT + ) + Profiler.setIsEnable(false) + sql("DROP TABLE IF EXISTS mobile") + sql("DROP TABLE IF EXISTS emp") + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d5bec4dd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 9d2b6e5..2be0efc 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -35,6 +35,8 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.hive.DistributionUtil import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.profiler.{GetPartition, Profiler, QueryTaskEnd} import org.apache.spark.sql.util.SparkSQLUtil.sessionState import org.apache.carbondata.common.logging.LogServiceFactory @@ -86,65 +88,111 @@ class CarbonScanRDD( @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) override def getPartitions: Array[Partition] = { - val conf = new Configuration() - val jobConf = new JobConf(conf) - SparkHadoopUtil.get.addCredentials(jobConf) - val job = Job.getInstance(jobConf) - val fileLevelExternal = tableInfo.getFactTable().getTableProperties().get("_filelevelformat") - val format = if (fileLevelExternal != null && fileLevelExternal.equalsIgnoreCase("true")) { - prepareFileInputFormatForDriver(job.getConfiguration) - } else { - prepareInputFormatForDriver(job.getConfiguration) - } - // initialise query_id for job - job.getConfiguration.set("query.id", queryId) - - // get splits - val splits = format.getSplits(job) - if ((splits == null) && format.isInstanceOf[CarbonFileInputFormat[Object]]) { - throw new SparkException( - "CarbonData file not exist in the segment_null (SDK writer Output) path") - } + val startTime = System.currentTimeMillis() + var partitions: Array[Partition] = Array.empty[Partition] + var getSplitsStartTime: Long = -1 + var getSplitsEndTime: Long = -1 + var distributeStartTime: Long = -1 + var distributeEndTime: Long = -1 + val tablePath = tableInfo.getOrCreateAbsoluteTableIdentifier().getTablePath + var numSegments = 0 + var numStreamSegments = 0 + var numBlocks = 0 - // separate split - // 1. for batch splits, invoke distributeSplits method to create partitions - // 2. for stream splits, create partition for each split by default - val columnarSplits = new ArrayList[InputSplit]() - val streamSplits = new ArrayBuffer[InputSplit]() - splits.asScala.foreach { split => - val carbonInputSplit = split.asInstanceOf[CarbonInputSplit] - if (FileFormat.ROW_V1 == carbonInputSplit.getFileFormat) { - streamSplits += split + try { + val conf = new Configuration() + val jobConf = new JobConf(conf) + SparkHadoopUtil.get.addCredentials(jobConf) + val job = Job.getInstance(jobConf) + val fileLevelExternal = tableInfo.getFactTable().getTableProperties().get("_filelevelformat") + val format = if (fileLevelExternal != null && fileLevelExternal.equalsIgnoreCase("true")) { + prepareFileInputFormatForDriver(job.getConfiguration) } else { - columnarSplits.add(split) + prepareInputFormatForDriver(job.getConfiguration) } - } - val batchPartitions = distributeColumnarSplits(columnarSplits) - // check and remove InExpression from filterExpression - checkAndRemoveInExpressinFromFilterExpression(batchPartitions) - if (streamSplits.isEmpty) { - batchPartitions.toArray - } else { - val index = batchPartitions.length - val streamPartitions: mutable.Buffer[Partition] = - streamSplits.zipWithIndex.map { splitWithIndex => - val multiBlockSplit = - new CarbonMultiBlockSplit( - Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava, - splitWithIndex._1.getLocations, - FileFormat.ROW_V1) - new CarbonSparkPartition(id, splitWithIndex._2 + index, multiBlockSplit) + // initialise query_id for job + job.getConfiguration.set("query.id", queryId) + + // get splits + getSplitsStartTime = System.currentTimeMillis() + val splits = format.getSplits(job) + getSplitsEndTime = System.currentTimeMillis() + if ((splits == null) && format.isInstanceOf[CarbonFileInputFormat[Object]]) { + throw new SparkException( + "CarbonData file not exist in the segment_null (SDK writer Output) path") + } + numSegments = format.getNumSegments + numStreamSegments = format.getNumStreamSegments + numBlocks = format.getNumBlocks + + // separate split + // 1. for batch splits, invoke distributeSplits method to create partitions + // 2. for stream splits, create partition for each split by default + val columnarSplits = new ArrayList[InputSplit]() + val streamSplits = new ArrayBuffer[InputSplit]() + splits.asScala.foreach { split => + val carbonInputSplit = split.asInstanceOf[CarbonInputSplit] + if (FileFormat.ROW_V1 == carbonInputSplit.getFileFormat) { + streamSplits += split + } else { + columnarSplits.add(split) } - if (batchPartitions.isEmpty) { - streamPartitions.toArray + } + distributeStartTime = System.currentTimeMillis() + val batchPartitions = distributeColumnarSplits(columnarSplits) + distributeEndTime = System.currentTimeMillis() + // check and remove InExpression from filterExpression + checkAndRemoveInExpressinFromFilterExpression(batchPartitions) + if (streamSplits.isEmpty) { + partitions = batchPartitions.toArray } else { - logInfo( - s""" - | Identified no.of Streaming Blocks: ${streamPartitions.size}, + val index = batchPartitions.length + val streamPartitions: mutable.Buffer[Partition] = + streamSplits.zipWithIndex.map { splitWithIndex => + val multiBlockSplit = + new CarbonMultiBlockSplit( + Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava, + splitWithIndex._1.getLocations, + FileFormat.ROW_V1) + new CarbonSparkPartition(id, splitWithIndex._2 + index, multiBlockSplit) + } + if (batchPartitions.isEmpty) { + partitions = streamPartitions.toArray + } else { + logInfo( + s""" + | Identified no.of Streaming Blocks: ${ streamPartitions.size }, """.stripMargin) - // should keep the order by index of partition - batchPartitions.appendAll(streamPartitions) - batchPartitions.toArray + // should keep the order by index of partition + batchPartitions.appendAll(streamPartitions) + partitions = batchPartitions.toArray + } + } + partitions + } finally { + Profiler.invokeIfEnable { + val endTime = System.currentTimeMillis() + val executionId = spark.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY).toLong + Profiler.send( + GetPartition( + executionId, + tableInfo.getDatabaseName + "." + tableInfo.getFactTable.getTableName, + tablePath, + queryId, + partitions.length, + startTime, + endTime, + getSplitsStartTime, + getSplitsEndTime, + numSegments, + numStreamSegments, + numBlocks, + distributeStartTime, + distributeEndTime, + if (filterExpression == null) "" else filterExpression.getStatement, + if (columnProjection == null) "" else columnProjection.getAllColumns.mkString(",") + ) + ) } } } @@ -340,7 +388,8 @@ class CarbonScanRDD( System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties" ) } - + val executionId = context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + val taskId = split.index val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId) val format = prepareInputFormatForExecutor(attemptContext.getConfiguration) @@ -349,6 +398,8 @@ class CarbonScanRDD( inputMetricsStats.initBytesReadCallback(context, inputSplit) val iterator = if (inputSplit.getAllSplits.size() > 0) { val model = format.createQueryModel(inputSplit, attemptContext) + // one query id per table + model.setQueryId(queryId) // get RecordReader by FileFormat val reader: RecordReader[Void, Object] = inputSplit.getFileFormat match { case FileFormat.ROW_V1 => @@ -388,7 +439,7 @@ class CarbonScanRDD( context.addTaskCompletionListener { _ => reader.close() close() - logStatistics(queryStartTime, model.getStatisticsRecorder) + logStatistics(executionId, taskId, queryStartTime, model.getStatisticsRecorder, split) } // initialize the reader reader.initialize(inputSplit, attemptContext) @@ -516,14 +567,40 @@ class CarbonScanRDD( format } - def logStatistics(queryStartTime: Long, recorder: QueryStatisticsRecorder): Unit = { + def logStatistics( + executionId: String, + taskId: Long, + queryStartTime: Long, + recorder: QueryStatisticsRecorder, + split: Partition + ): Unit = { if (null != recorder) { val queryStatistic = new QueryStatistic() queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART, System.currentTimeMillis - queryStartTime) recorder.recordStatistics(queryStatistic) // print executor query statistics for each task_id - recorder.logStatisticsAsTableExecutor() + val statistics = recorder.statisticsForTask(taskId, queryStartTime) + if (statistics != null) { + Profiler.invokeIfEnable { + val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value + inputSplit.calculateLength() + val size = inputSplit.getLength + val files = inputSplit.getAllSplits.asScala.map { s => + s.getSegmentId + "/" + s.getPath.getName + }.toArray[String] + Profiler.send( + QueryTaskEnd( + executionId.toLong, + queryId, + statistics.getValues, + size, + files + ) + ) + } + } + recorder.logStatisticsForTask(statistics) } } @@ -531,7 +608,6 @@ class CarbonScanRDD( * This method will check and remove InExpression from filterExpression to prevent the List * Expression values from serializing and deserializing on executor * - * @param format * @param identifiedPartitions */ private def checkAndRemoveInExpressinFromFilterExpression(
