Repository: carbondata
Updated Branches:
  refs/heads/master 936037056 -> eae9064a5


[CARBONDATA-2185] Add InputMetrics for Streaming Reader

This closes #1985


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/eae9064a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/eae9064a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/eae9064a

Branch: refs/heads/master
Commit: eae9064a5d42c4d321bcbbeb4355745b3ff44a06
Parents: 9360370
Author: BJangir <babulaljangir...@gmail.com>
Authored: Mon Feb 19 22:31:00 2018 +0530
Committer: QiangCai <qiang...@qq.com>
Committed: Fri Feb 23 19:21:09 2018 +0800

----------------------------------------------------------------------
 .../streaming/CarbonStreamRecordReader.java       | 18 ++++++++++++++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala      |  1 +
 2 files changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/eae9064a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
index 773089b..95a7af0 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
@@ -58,6 +58,7 @@ import org.apache.carbondata.format.BlockletHeader;
 import org.apache.carbondata.format.FileHeader;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.InputMetricsStats;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 import org.apache.carbondata.hadoop.util.CarbonTypeUtil;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -137,6 +138,9 @@ public class CarbonStreamRecordReader extends 
RecordReader<Void, Object> {
   // return raw row for handoff
   private boolean useRawRow = false;
 
+  // InputMetricsStats
+  private InputMetricsStats inputMetricsStats;
+
   @Override public void initialize(InputSplit split, TaskAttemptContext 
context)
       throws IOException, InterruptedException {
     // input
@@ -392,8 +396,18 @@ public class CarbonStreamRecordReader extends 
RecordReader<Void, Object> {
 
   @Override public Object getCurrentValue() throws IOException, 
InterruptedException {
     if (isVectorReader) {
+      int value = columnarBatch.numValidRows();
+      if (inputMetricsStats != null) {
+        inputMetricsStats.incrementRecordRead((long) value);
+      }
+
       return columnarBatch;
     }
+
+    if (inputMetricsStats != null) {
+      inputMetricsStats.incrementRecordRead(1L);
+    }
+
     return outputRow;
   }
 
@@ -730,6 +744,10 @@ public class CarbonStreamRecordReader extends 
RecordReader<Void, Object> {
     this.isVectorReader = isVectorReader;
   }
 
+  public void setInputMetricsStats(InputMetricsStats inputMetricsStats) {
+    this.inputMetricsStats = inputMetricsStats;
+  }
+
   @Override public void close() throws IOException {
     if (null != input) {
       input.close();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eae9064a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index e554a58..8c29c2a 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -345,6 +345,7 @@ class CarbonScanRDD(
           val streamReader = inputFormat.createRecordReader(inputSplit, 
attemptContext)
             .asInstanceOf[CarbonStreamRecordReader]
           streamReader.setVectorReader(vectorReader)
+          streamReader.setInputMetricsStats(inputMetricsStats)
           model.setStatisticsRecorder(
             
CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId))
           streamReader.setQueryModel(model)

Reply via email to