Repository: carbondata Updated Branches: refs/heads/master 5ae596b76 -> 77217b370
[CARBONDATA-1847] Add inputSize for value read This closes #1607 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/77217b37 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/77217b37 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/77217b37 Branch: refs/heads/master Commit: 77217b370b6beba7408039fec465a36e0b824028 Parents: 5ae596b Author: Jacky Li <[email protected]> Authored: Mon Dec 4 18:49:03 2017 +0800 Committer: QiangCai <[email protected]> Committed: Mon Dec 4 23:23:08 2017 +0800 ---------------------------------------------------------------------- .../org/apache/carbondata/hadoop/InputMetricsStats.java | 4 ++++ .../org/apache/carbondata/spark/rdd/CarbonScanRDD.scala | 1 + .../spark/vectorreader/VectorizedCarbonRecordReader.java | 4 +++- .../main/scala/org/apache/spark/CarbonInputMetrics.scala | 9 +++++++++ .../apache/spark/sql/CarbonDatasourceHadoopRelation.scala | 3 ++- 5 files changed, 19 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/77217b37/hadoop/src/main/java/org/apache/carbondata/hadoop/InputMetricsStats.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/InputMetricsStats.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/InputMetricsStats.java index e678100..cc39b34 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/InputMetricsStats.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/InputMetricsStats.java @@ -35,4 +35,8 @@ public interface InputMetricsStats extends Serializable { */ void updateAndClose(); + /** + * update metric by `value`, it can be ColumnarBatch or InternalRow + */ + void updateByValue(Object value); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/77217b37/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 7316574..67d75bd 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -307,6 +307,7 @@ class CarbonScanRDD( } havePair = false val value = reader.getCurrentValue + inputMetricsStats.updateByValue(value) value } http://git-wip-us.apache.org/repos/asf/carbondata/blob/77217b37/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java index 3acedab..eba0787 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java @@ -165,7 +165,9 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { if (returnColumnarBatch) { int value = columnarBatch.numValidRows(); rowCount += value; - inputMetricsStats.incrementRecordRead((long)value); + if (inputMetricsStats != null) { + inputMetricsStats.incrementRecordRead((long) value); + } return columnarBatch; } rowCount += 1; http://git-wip-us.apache.org/repos/asf/carbondata/blob/77217b37/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala b/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala index b562ebc..deef157 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/CarbonInputMetrics.scala @@ -20,6 +20,7 @@ import java.lang.Long import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.InputMetrics +import org.apache.spark.sql.execution.vectorized.ColumnarBatch import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.util.TaskMetricsMap @@ -75,4 +76,12 @@ class CarbonInputMetrics extends InitInputMetrics{ } } } + + override def updateByValue(value: Object): Unit = { + value match { + case batch: ColumnarBatch => + inputMetrics.incRecordsRead(batch.numRows()) + case _ => + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/77217b37/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index 57233cf..e5de052 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -79,7 +79,8 @@ case class CarbonDatasourceHadoopRelation( filterExpression.orNull, identifier, carbonTable.getTableInfo.serialize(), - carbonTable.getTableInfo, inputMetricsStats) + carbonTable.getTableInfo, + inputMetricsStats) } override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0)
