HIVE-10913 : LLAP: cache QF counters have a wrong value for consumer time 
(Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f032e548
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f032e548
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f032e548

Branch: refs/heads/llap
Commit: f032e5488cc50f91c00b9c06ecc49b21146eb591
Parents: 5412707
Author: Sergey Shelukhin <[email protected]>
Authored: Wed Jun 3 14:50:21 2015 -0700
Committer: Sergey Shelukhin <[email protected]>
Committed: Wed Jun 3 14:50:21 2015 -0700

----------------------------------------------------------------------
 .../hive/llap/counters/QueryFragmentCounters.java    | 15 ++++++++++-----
 .../hive/llap/io/api/impl/LlapInputFormat.java       | 14 ++++++++++----
 .../hive/llap/io/encoded/OrcEncodedDataReader.java   |  2 ++
 3 files changed, 22 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f032e548/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
index 7658b03..d7825a7 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
@@ -48,11 +48,14 @@ public class QueryFragmentCounters implements 
LowLevelCacheCounters {
   }
 
   public static enum Desc {
-    TABLE
+    MACHINE,
+    TABLE,
+    FILE,
+    STRIPES
   }
 
   private final AtomicLongArray fixedCounters;
-  private final String[] descs;
+  private final Object[] descs;
 
   public QueryFragmentCounters(Configuration conf) {
     fixedCounters = new AtomicLongArray(Counter.values().length);
@@ -62,7 +65,7 @@ public class QueryFragmentCounters implements 
LowLevelCacheCounters {
       setCounter(Counter.TOTAL_IO_TIME_US, -1);
       setCounter(Counter.DECODE_TIME_US, -1);
       setCounter(Counter.HDFS_TIME_US, -1);
-      setCounter(Counter.HDFS_TIME_US, -1);
+      setCounter(Counter.CONSUMER_TIME_US, -1);
     }
   }
 
@@ -88,7 +91,7 @@ public class QueryFragmentCounters implements 
LowLevelCacheCounters {
     fixedCounters.set(counter.ordinal(), value);
   }
 
-  public void setDesc(Desc key, String desc) {
+  public void setDesc(Desc key, Object desc) {
     descs[key.ordinal()] = desc;
   }
 
@@ -122,7 +125,9 @@ public class QueryFragmentCounters implements 
LowLevelCacheCounters {
       if (i != 0) {
         sb.append(", ");
       }
-      sb.append(descs[i]);
+      if (descs[i] != null) {
+        sb.append(descs[i]);
+      }
     }
     sb.append("]: [ ");
     for (int i = 0; i < fixedCounters.length(); ++i) {

http://git-wip-us.apache.org/repos/asf/hive/blob/f032e548/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index a4f69da..e90dbbc 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hive.common.util.HiveStringUtils;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -59,6 +60,7 @@ public class LlapInputFormat
   private final InputFormat sourceInputFormat;
   private final ColumnVectorProducer cvp;
   private final ListeningExecutorService executor;
+  private final String hostName;
 
   @SuppressWarnings("rawtypes")
   LlapInputFormat(InputFormat sourceInputFormat, ColumnVectorProducer cvp,
@@ -69,6 +71,7 @@ public class LlapInputFormat
     this.executor = executor;
     this.cvp = cvp;
     this.sourceInputFormat = sourceInputFormat;
+    this.hostName = HiveStringUtils.getHostname();
   }
 
   @Override
@@ -84,7 +87,7 @@ public class LlapInputFormat
     try {
       List<Integer> includedCols = ColumnProjectionUtils.isReadAllColumns(job)
           ? null : ColumnProjectionUtils.getReadColumnIDs(job);
-      return new LlapRecordReader(job, fileSplit, includedCols);
+      return new LlapRecordReader(job, fileSplit, includedCols, hostName);
     } catch (Exception ex) {
       throw new IOException(ex);
     }
@@ -114,12 +117,14 @@ public class LlapInputFormat
     private final QueryFragmentCounters counters;
     private long firstReturnTime;
 
-    public LlapRecordReader(JobConf job, FileSplit split, List<Integer> 
includedCols) {
+    public LlapRecordReader(
+        JobConf job, FileSplit split, List<Integer> includedCols, String 
hostName) {
       this.split = split;
       this.columnIds = includedCols;
       this.sarg = SearchArgumentFactory.createFromConf(job);
       this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
       this.counters = new QueryFragmentCounters(job);
+      this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName);
       try {
         rbCtx = new VectorizedRowBatchCtx();
         rbCtx.init(job, split);
@@ -136,6 +141,7 @@ public class LlapInputFormat
         throw new AssertionError("next called after close");
       }
       // Add partition cols if necessary (see VectorizedOrcInputFormat for 
details).
+      boolean wasFirst = isFirst;
       if (isFirst) {
         try {
           rbCtx.addPartitionColsToBatch(value);
@@ -153,7 +159,7 @@ public class LlapInputFormat
         throw new IOException(e);
       }
       if (cvb == null) {
-        if (isFirst) {
+        if (wasFirst) {
           firstReturnTime = counters.startTimeCounter();
         }
         counters.incrTimeCounter(Counter.CONSUMER_TIME_US, firstReturnTime);
@@ -170,7 +176,7 @@ public class LlapInputFormat
       }
       value.selectedInUse = false;
       value.size = cvb.size;
-      if (isFirst) {
+      if (wasFirst) {
         firstReturnTime = counters.startTimeCounter();
       }
       return true;

http://git-wip-us.apache.org/repos/asf/hive/blob/f032e548/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 6d2a38e..e107378 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -143,6 +143,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     // Don't cache the filesystem object for now; Tez closes it and FS cache 
will fix all that
     fs = split.getPath().getFileSystem(conf);
     fileId = determineFileId(fs, split);
+    counters.setDesc(QueryFragmentCounters.Desc.FILE, fileId);
 
     try {
       fileMetadata = getOrReadFileMetadata();
@@ -165,6 +166,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
       recordReaderTime(startTime);
       return null; // No data to read.
     }
+    counters.setDesc(QueryFragmentCounters.Desc.STRIPES, stripeIxFrom + "," + 
readState.length);
 
     // 3. Apply SARG if needed, and otherwise determine what RGs to read.
     int stride = fileMetadata.getRowIndexStride();

Reply via email to