Repository: carbondata Updated Branches: refs/heads/master 936037056 -> eae9064a5
[CARBONDATA-2185] Add InputMetrics for Streaming Reader This closes #1985 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/eae9064a Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/eae9064a Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/eae9064a Branch: refs/heads/master Commit: eae9064a5d42c4d321bcbbeb4355745b3ff44a06 Parents: 9360370 Author: BJangir <babulaljangir...@gmail.com> Authored: Mon Feb 19 22:31:00 2018 +0530 Committer: QiangCai <qiang...@qq.com> Committed: Fri Feb 23 19:21:09 2018 +0800 ---------------------------------------------------------------------- .../streaming/CarbonStreamRecordReader.java | 18 ++++++++++++++++++ .../carbondata/spark/rdd/CarbonScanRDD.scala | 1 + 2 files changed, 19 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/eae9064a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java index 773089b..95a7af0 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java @@ -58,6 +58,7 @@ import org.apache.carbondata.format.BlockletHeader; import org.apache.carbondata.format.FileHeader; import org.apache.carbondata.hadoop.CarbonInputSplit; import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.InputMetricsStats; import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; import org.apache.carbondata.hadoop.util.CarbonTypeUtil; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; @@ -137,6 +138,9 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> { // return raw row for handoff private boolean useRawRow = false; + // InputMetricsStats + private InputMetricsStats inputMetricsStats; + @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // input @@ -392,8 +396,18 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> { @Override public Object getCurrentValue() throws IOException, InterruptedException { if (isVectorReader) { + int value = columnarBatch.numValidRows(); + if (inputMetricsStats != null) { + inputMetricsStats.incrementRecordRead((long) value); + } + return columnarBatch; } + + if (inputMetricsStats != null) { + inputMetricsStats.incrementRecordRead(1L); + } + return outputRow; } @@ -730,6 +744,10 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> { this.isVectorReader = isVectorReader; } + public void setInputMetricsStats(InputMetricsStats inputMetricsStats) { + this.inputMetricsStats = inputMetricsStats; + } + @Override public void close() throws IOException { if (null != input) { input.close(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/eae9064a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index e554a58..8c29c2a 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -345,6 +345,7 @@ class CarbonScanRDD( val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext) .asInstanceOf[CarbonStreamRecordReader] streamReader.setVectorReader(vectorReader) + streamReader.setInputMetricsStats(inputMetricsStats) model.setStatisticsRecorder( CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId)) streamReader.setQueryModel(model)