result_size in query statistics is not giving valid row count if vector reader is enabled.
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/ec2d742f Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/ec2d742f Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/ec2d742f Branch: refs/heads/branch-1.1 Commit: ec2d742f2e479e40883b92df014a8b260d50e526 Parents: f4fc651 Author: nareshpr <[email protected]> Authored: Wed Apr 12 19:53:21 2017 +0530 Committer: ravipesala <[email protected]> Committed: Thu Apr 13 16:14:09 2017 +0530 ---------------------------------------------------------------------- .../carbondata/hadoop/AbstractRecordReader.java | 45 ++++++++++++++++++++ .../carbondata/hadoop/CarbonRecordReader.java | 5 ++- .../carbondata/spark/rdd/CarbonScanRDD.scala | 13 +----- .../VectorizedCarbonRecordReader.java | 11 +++-- 4 files changed, 58 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ec2d742f/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java new file mode 100644 index 0000000..e571ccf --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.hadoop; + +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; +import org.apache.carbondata.core.stats.QueryStatisticsRecorder; + +import org.apache.hadoop.mapreduce.RecordReader; + +/** + * This class will have all the common methods for vector and row based reader + */ +public abstract class AbstractRecordReader<T> extends RecordReader<Void, T> { + + protected int rowCount = 0; + + /** + * This method will log query result count and querytime + * @param recordCount + * @param recorder + */ + public void logStatistics(int recordCount, QueryStatisticsRecorder recorder) { + // result size + QueryStatistic queryStatistic = new QueryStatistic(); + queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount); + recorder.recordStatistics(queryStatistic); + // print executor query statistics for each task_id + recorder.logStatisticsAsTableExecutor(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ec2d742f/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java index 27c8b2f..26b269a 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java @@ -33,13 +33,12 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; /** * Reads the data from Carbon store. */ -public class CarbonRecordReader<T> extends RecordReader<Void, T> { +public class CarbonRecordReader<T> extends AbstractRecordReader<T> { private QueryModel queryModel; @@ -92,6 +91,7 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> { } @Override public T getCurrentValue() throws IOException, InterruptedException { + rowCount += 1; return readSupport.readRow(carbonIterator.next()); } @@ -101,6 +101,7 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> { } @Override public void close() throws IOException { + logStatistics(rowCount, queryModel.getStatisticsRecorder()); // clear dictionary cache Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping(); if (null != columnToDictionaryMapping) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ec2d742f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index ab0d603..4807b90 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -207,10 +207,9 @@ class CarbonScanRDD( new Iterator[Any] { private var havePair = false private var finished = false - private var count = 0 context.addTaskCompletionListener { context => - logStatistics(queryStartTime, count, model.getStatisticsRecorder) + logStatistics(queryStartTime, model.getStatisticsRecorder) reader.close() } @@ -231,7 +230,6 @@ class CarbonScanRDD( } havePair = false val value = reader.getCurrentValue - count += 1 value } } @@ -265,18 +263,11 @@ class CarbonScanRDD( format } - def logStatistics(queryStartTime: Long, recordCount: Int, - recorder: QueryStatisticsRecorder): Unit = { + def logStatistics(queryStartTime: Long, recorder: QueryStatisticsRecorder): Unit = { var queryStatistic = new QueryStatistic() queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART, System.currentTimeMillis - queryStartTime) recorder.recordStatistics(queryStatistic) - // result size - queryStatistic = new QueryStatistic() - queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount) - recorder.recordStatistics(queryStatistic) - // print executor query statistics for each task_id - recorder.logStatisticsAsTableExecutor() } /** http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ec2d742f/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java index ffff956..3fdf9af 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java @@ -38,12 +38,12 @@ import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResult import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.hadoop.AbstractRecordReader; import org.apache.carbondata.hadoop.CarbonInputSplit; import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; import org.apache.carbondata.spark.util.CarbonScalaUtil; import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.spark.memory.MemoryMode; import org.apache.spark.sql.execution.vectorized.ColumnarBatch; @@ -55,7 +55,7 @@ import org.apache.spark.sql.types.StructType; * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the * carbondata column APIs and fills the data directly into columns. */ -class VectorizedCarbonRecordReader extends RecordReader<Void, Object> { +class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { private int batchIdx = 0; @@ -116,6 +116,7 @@ class VectorizedCarbonRecordReader extends RecordReader<Void, Object> { } @Override public void close() throws IOException { + logStatistics(rowCount, queryModel.getStatisticsRecorder()); if (columnarBatch != null) { columnarBatch.close(); columnarBatch = null; @@ -147,7 +148,11 @@ class VectorizedCarbonRecordReader extends RecordReader<Void, Object> { } @Override public Object getCurrentValue() throws IOException, InterruptedException { - if (returnColumnarBatch) return columnarBatch; + if (returnColumnarBatch) { + rowCount += columnarBatch.numValidRows(); + return columnarBatch; + } + rowCount += 1; return columnarBatch.getRow(batchIdx - 1); }
