Repository: hive
Updated Branches:
  refs/heads/master 2028749b1 -> d682ca926


HIVE-18652: Print Spark metrics on console (Sahil Takiar, reviewed by Vihang 
Karajgaonkar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d682ca92
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d682ca92
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d682ca92

Branch: refs/heads/master
Commit: d682ca9266df182e977b35ab47771dbac27777ec
Parents: 2028749
Author: Sahil Takiar <takiar.sa...@gmail.com>
Authored: Mon Jun 4 13:36:04 2018 -0500
Committer: Sahil Takiar <stak...@cloudera.com>
Committed: Mon Jun 4 13:36:48 2018 -0500

----------------------------------------------------------------------
 .../hive/ql/exec/spark/TestSparkStatistics.java |  2 +-
 .../hadoop/hive/ql/exec/spark/SparkTask.java    | 83 +++++++++++++++++++-
 .../spark/Statistic/SparkStatisticGroup.java    |  4 +
 .../spark/Statistic/SparkStatisticsNames.java   | 25 ++++--
 .../spark/status/impl/SparkMetricsUtils.java    | 37 ++++++---
 .../hive/spark/client/MetricsCollection.java    | 20 ++++-
 .../hive/spark/client/metrics/InputMetrics.java | 12 ++-
 .../client/metrics/ShuffleReadMetrics.java      | 21 ++++-
 .../client/metrics/ShuffleWriteMetrics.java     | 11 ++-
 .../spark/client/TestMetricsCollection.java     | 15 ++--
 10 files changed, 190 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d682ca92/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
index 4413161..f6c5b17 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
@@ -81,7 +81,7 @@ public class TestSparkStatistics {
       List<SparkStatistic> sparkStats = 
Lists.newArrayList(sparkTask.getSparkStatistics()
               
.getStatisticGroup(SparkStatisticsNames.SPARK_GROUP_NAME).getStatistics());
 
-      Assert.assertEquals(18, sparkStats.size());
+      Assert.assertEquals(24, sparkStats.size());
 
       Map<String, String> statsMap = sparkStats.stream().collect(
               Collectors.toMap(SparkStatistic::getName, 
SparkStatistic::getValue));

http://git-wip-us.apache.org/repos/asf/hive/blob/d682ca92/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index 8038771..ddbb6ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -35,6 +35,7 @@ import com.google.common.base.Throwables;
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.SparkMetricsUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -161,6 +162,7 @@ public class SparkTask extends Task<SparkWork> {
 
       if (rc == 0) {
         sparkStatistics = sparkJobStatus.getSparkStatistics();
+        printConsoleMetrics();
         printExcessiveGCWarning();
         if (LOG.isInfoEnabled() && sparkStatistics != null) {
           LOG.info(sparkStatisticsToString(sparkStatistics, sparkJobID));
@@ -222,6 +224,79 @@ public class SparkTask extends Task<SparkWork> {
     return rc;
   }
 
+  private void printConsoleMetrics() {
+    SparkStatisticGroup sparkStatisticGroup = 
sparkStatistics.getStatisticGroup(
+            SparkStatisticsNames.SPARK_GROUP_NAME);
+
+    if (sparkStatisticGroup != null) {
+      String colon = ": ";
+      String forwardSlash = " / ";
+      String separator = ", ";
+
+      String metricsString = String.format("Spark Job[%d] Metrics: ", 
sparkJobID);
+
+      // Task Duration Time
+      if 
(sparkStatisticGroup.containsSparkStatistic(SparkStatisticsNames.TASK_DURATION_TIME))
 {
+        metricsString += SparkStatisticsNames.TASK_DURATION_TIME + colon +
+                SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup,
+                        SparkStatisticsNames.TASK_DURATION_TIME) + separator;
+      }
+
+      // Executor CPU Time
+      if 
(sparkStatisticGroup.containsSparkStatistic(SparkStatisticsNames.EXECUTOR_CPU_TIME))
 {
+        metricsString += SparkStatisticsNames.EXECUTOR_CPU_TIME + colon +
+                SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup,
+                        SparkStatisticsNames.EXECUTOR_CPU_TIME) + separator;
+      }
+
+      // JCM GC Time
+      if 
(sparkStatisticGroup.containsSparkStatistic(SparkStatisticsNames.JVM_GC_TIME)) {
+        metricsString += SparkStatisticsNames.JVM_GC_TIME + colon +
+                SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup,
+                        SparkStatisticsNames.JVM_GC_TIME) + separator;
+      }
+
+      // Bytes Read / Records Read
+      if 
(sparkStatisticGroup.containsSparkStatistic(SparkStatisticsNames.BYTES_READ) &&
+              
sparkStatisticGroup.containsSparkStatistic(SparkStatisticsNames.RECORDS_READ)) {
+        metricsString += SparkStatisticsNames.BYTES_READ + forwardSlash +
+                SparkStatisticsNames.RECORDS_READ + colon +
+                SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup,
+                        SparkStatisticsNames.BYTES_READ) + forwardSlash +
+                SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup,
+                        SparkStatisticsNames.RECORDS_READ) + separator;
+      }
+
+      // Shuffle Read Bytes / Shuffle Read Records
+      if (sparkStatisticGroup.containsSparkStatistic(
+              SparkStatisticsNames.SHUFFLE_TOTAL_BYTES_READ) &&
+              sparkStatisticGroup.containsSparkStatistic(
+                      SparkStatisticsNames.SHUFFLE_RECORDS_READ)) {
+        metricsString += SparkStatisticsNames.SHUFFLE_TOTAL_BYTES_READ + 
forwardSlash +
+                SparkStatisticsNames.SHUFFLE_RECORDS_READ + colon +
+                SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup,
+                        SparkStatisticsNames.SHUFFLE_TOTAL_BYTES_READ) + 
forwardSlash +
+                SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup,
+                        SparkStatisticsNames.SHUFFLE_RECORDS_READ) + separator;
+      }
+
+      // Shuffle Write Bytes / Shuffle Write Records
+      if (sparkStatisticGroup.containsSparkStatistic(
+              SparkStatisticsNames.SHUFFLE_BYTES_WRITTEN) &&
+              sparkStatisticGroup.containsSparkStatistic(
+                      SparkStatisticsNames.SHUFFLE_RECORDS_WRITTEN)) {
+        metricsString += SparkStatisticsNames.SHUFFLE_BYTES_WRITTEN + 
forwardSlash +
+                SparkStatisticsNames.SHUFFLE_RECORDS_WRITTEN + colon +
+                SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup,
+                        SparkStatisticsNames.SHUFFLE_BYTES_WRITTEN) + 
forwardSlash +
+                SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup,
+                        SparkStatisticsNames.SHUFFLE_RECORDS_WRITTEN);
+      }
+
+      console.printInfo(metricsString);
+    }
+  }
+
   /**
    * Use the Spark metrics and calculate how much task executione time was 
spent performing GC
    * operations. If more than a defined threshold of time is spent, print out 
a warning on the
@@ -231,10 +306,10 @@ public class SparkTask extends Task<SparkWork> {
     SparkStatisticGroup sparkStatisticGroup = 
sparkStatistics.getStatisticGroup(
             SparkStatisticsNames.SPARK_GROUP_NAME);
     if (sparkStatisticGroup != null) {
-      long taskDurationTime = 
Long.parseLong(sparkStatisticGroup.getSparkStatistic(
-              SparkStatisticsNames.TASK_DURATION_TIME).getValue());
-      long jvmGCTime = Long.parseLong(sparkStatisticGroup.getSparkStatistic(
-              SparkStatisticsNames.JVM_GC_TIME).getValue());
+      long taskDurationTime = 
SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup,
+              SparkStatisticsNames.TASK_DURATION_TIME);
+      long jvmGCTime = 
SparkMetricsUtils.getSparkStatisticAsLong(sparkStatisticGroup,
+              SparkStatisticsNames.JVM_GC_TIME);
 
       // Threshold percentage to trigger the GC warning
       double threshold = 0.1;

http://git-wip-us.apache.org/repos/asf/hive/blob/d682ca92/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java
index e1006e3..d5d628e 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java
@@ -47,4 +47,8 @@ public class SparkStatisticGroup {
   public SparkStatistic getSparkStatistic(String name) {
     return this.statistics.get(name);
   }
+
+  public boolean containsSparkStatistic(String name) {
+    return this.statistics.containsKey(name);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d682ca92/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
index 68e4f9e..12c3eac 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java
@@ -31,15 +31,28 @@ public class SparkStatisticsNames {
   public static final String RESULT_SERIALIZATION_TIME = 
"ResultSerializationTime";
   public static final String MEMORY_BYTES_SPILLED = "MemoryBytesSpilled";
   public static final String DISK_BYTES_SPILLED = "DiskBytesSpilled";
+
+  public static final String TASK_DURATION_TIME = "TaskDurationTime";
+
+  // Input Metrics
   public static final String BYTES_READ = "BytesRead";
-  public static final String REMOTE_BLOCKS_FETCHED = "RemoteBlocksFetched";
-  public static final String LOCAL_BLOCKS_FETCHED = "LocalBlocksFetched";
-  public static final String TOTAL_BLOCKS_FETCHED = "TotalBlocksFetched";
-  public static final String FETCH_WAIT_TIME = "FetchWaitTime";
-  public static final String REMOTE_BYTES_READ = "RemoteBytesRead";
+  public static final String RECORDS_READ = "RecordsRead";
+
+  // Shuffle Read Metrics
+  public static final String SHUFFLE_FETCH_WAIT_TIME = "ShuffleFetchWaitTime";
+  public static final String SHUFFLE_REMOTE_BYTES_READ = 
"ShuffleRemoteBytesRead";
+  public static final String SHUFFLE_LOCAL_BYTES_READ = 
"ShuffleLocalBytesRead";
+  public static final String SHUFFLE_TOTAL_BYTES_READ = 
"ShuffleTotalBytesRead";
+  public static final String SHUFFLE_REMOTE_BLOCKS_FETCHED = 
"ShuffleRemoteBlocksFetched";
+  public static final String SHUFFLE_LOCAL_BLOCKS_FETCHED = 
"ShuffleLocalBlocksFetched";
+  public static final String SHUFFLE_TOTAL_BLOCKS_FETCHED = 
"ShuffleTotalBlocksFetched";
+  public static final String SHUFFLE_REMOTE_BYTES_READ_TO_DISK = 
"ShuffleRemoteBytesReadToDisk";
+  public static final String SHUFFLE_RECORDS_READ = "ShuffleRecordsRead";
+
+  // Shuffle Write Metrics
   public static final String SHUFFLE_BYTES_WRITTEN = "ShuffleBytesWritten";
   public static final String SHUFFLE_WRITE_TIME = "ShuffleWriteTime";
-  public static final String TASK_DURATION_TIME = "TaskDurationTime";
+  public static final String SHUFFLE_RECORDS_WRITTEN = "ShuffleRecordsWritten";
 
   public static final String SPARK_GROUP_NAME = "SPARK";
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d682ca92/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
index fab5422..c73c150 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java
@@ -20,45 +20,58 @@ package org.apache.hadoop.hive.ql.exec.spark.status.impl;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticGroup;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames;
 import org.apache.hive.spark.client.metrics.Metrics;
 import org.apache.hive.spark.client.metrics.ShuffleReadMetrics;
 
-final class SparkMetricsUtils {
+public final class SparkMetricsUtils {
 
   private SparkMetricsUtils(){}
 
   static Map<String, Long> collectMetrics(Metrics allMetrics) {
     Map<String, Long> results = new LinkedHashMap<String, Long>();
+    results.put(SparkStatisticsNames.TASK_DURATION_TIME, 
allMetrics.taskDurationTime);
+    results.put(SparkStatisticsNames.EXECUTOR_CPU_TIME, 
allMetrics.executorCpuTime);
+    results.put(SparkStatisticsNames.EXECUTOR_RUN_TIME, 
allMetrics.executorRunTime);
+    results.put(SparkStatisticsNames.JVM_GC_TIME, allMetrics.jvmGCTime);
+    results.put(SparkStatisticsNames.MEMORY_BYTES_SPILLED, 
allMetrics.memoryBytesSpilled);
+    results.put(SparkStatisticsNames.DISK_BYTES_SPILLED, 
allMetrics.diskBytesSpilled);
     results.put(SparkStatisticsNames.EXECUTOR_DESERIALIZE_TIME, 
allMetrics.executorDeserializeTime);
     results.put(SparkStatisticsNames.EXECUTOR_DESERIALIZE_CPU_TIME,
             allMetrics.executorDeserializeCpuTime);
-    results.put(SparkStatisticsNames.EXECUTOR_RUN_TIME, 
allMetrics.executorRunTime);
-    results.put(SparkStatisticsNames.EXECUTOR_CPU_TIME, 
allMetrics.executorCpuTime);
     results.put(SparkStatisticsNames.RESULT_SIZE, allMetrics.resultSize);
-    results.put(SparkStatisticsNames.JVM_GC_TIME, allMetrics.jvmGCTime);
     results.put(SparkStatisticsNames.RESULT_SERIALIZATION_TIME, 
allMetrics.resultSerializationTime);
-    results.put(SparkStatisticsNames.MEMORY_BYTES_SPILLED, 
allMetrics.memoryBytesSpilled);
-    results.put(SparkStatisticsNames.DISK_BYTES_SPILLED, 
allMetrics.diskBytesSpilled);
-    results.put(SparkStatisticsNames.TASK_DURATION_TIME, 
allMetrics.taskDurationTime);
     if (allMetrics.inputMetrics != null) {
       results.put(SparkStatisticsNames.BYTES_READ, 
allMetrics.inputMetrics.bytesRead);
+      results.put(SparkStatisticsNames.RECORDS_READ, 
allMetrics.inputMetrics.recordsRead);
     }
     if (allMetrics.shuffleReadMetrics != null) {
       ShuffleReadMetrics shuffleReadMetrics = allMetrics.shuffleReadMetrics;
       long rbf = shuffleReadMetrics.remoteBlocksFetched;
       long lbf = shuffleReadMetrics.localBlocksFetched;
-      results.put(SparkStatisticsNames.REMOTE_BLOCKS_FETCHED, rbf);
-      results.put(SparkStatisticsNames.LOCAL_BLOCKS_FETCHED, lbf);
-      results.put(SparkStatisticsNames.TOTAL_BLOCKS_FETCHED, rbf + lbf);
-      results.put(SparkStatisticsNames.FETCH_WAIT_TIME, 
shuffleReadMetrics.fetchWaitTime);
-      results.put(SparkStatisticsNames.REMOTE_BYTES_READ, 
shuffleReadMetrics.remoteBytesRead);
+      results.put(SparkStatisticsNames.SHUFFLE_TOTAL_BYTES_READ,
+              shuffleReadMetrics.remoteBytesRead + 
shuffleReadMetrics.localBytesRead);
+      results.put(SparkStatisticsNames.SHUFFLE_REMOTE_BYTES_READ, 
shuffleReadMetrics.remoteBytesRead);
+      results.put(SparkStatisticsNames.SHUFFLE_LOCAL_BYTES_READ, 
shuffleReadMetrics.localBytesRead);
+      results.put(SparkStatisticsNames.SHUFFLE_REMOTE_BYTES_READ_TO_DISK, 
shuffleReadMetrics
+              .remoteBytesReadToDisk);
+      results.put(SparkStatisticsNames.SHUFFLE_RECORDS_READ, 
shuffleReadMetrics.recordsRead);
+      results.put(SparkStatisticsNames.SHUFFLE_TOTAL_BLOCKS_FETCHED, rbf + 
lbf);
+      results.put(SparkStatisticsNames.SHUFFLE_REMOTE_BLOCKS_FETCHED, rbf);
+      results.put(SparkStatisticsNames.SHUFFLE_LOCAL_BLOCKS_FETCHED, lbf);
+      results.put(SparkStatisticsNames.SHUFFLE_FETCH_WAIT_TIME, 
shuffleReadMetrics.fetchWaitTime);
     }
     if (allMetrics.shuffleWriteMetrics != null) {
       results.put(SparkStatisticsNames.SHUFFLE_BYTES_WRITTEN, 
allMetrics.shuffleWriteMetrics.shuffleBytesWritten);
+      results.put(SparkStatisticsNames.SHUFFLE_RECORDS_WRITTEN,
+              allMetrics.shuffleWriteMetrics.shuffleRecordsWritten);
       results.put(SparkStatisticsNames.SHUFFLE_WRITE_TIME, 
allMetrics.shuffleWriteMetrics.shuffleWriteTime);
     }
     return results;
   }
 
+  public static long getSparkStatisticAsLong(SparkStatisticGroup group, String 
name) {
+    return Long.parseLong(group.getSparkStatistic(name).getValue());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d682ca92/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
 
b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
index 2f3c026..a0db015 100644
--- 
a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
+++ 
b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
@@ -154,6 +154,7 @@ public class MetricsCollection {
       // Input metrics.
       boolean hasInputMetrics = false;
       long bytesRead = 0L;
+      long recordsRead = 0L;
 
       // Shuffle read metrics.
       boolean hasShuffleReadMetrics = false;
@@ -161,10 +162,14 @@ public class MetricsCollection {
       int localBlocksFetched = 0;
       long fetchWaitTime = 0L;
       long remoteBytesRead = 0L;
+      long localBytesRead = 0L;
+      long remoteBytesReadToDisk = 0L;
+      long shuffleRecordsRead = 0L;
 
       // Shuffle write metrics.
       long shuffleBytesWritten = 0L;
       long shuffleWriteTime = 0L;
+      long shuffleRecordsWritten = 0L;
 
       for (TaskInfo info : Collections2.filter(taskMetrics, filter)) {
         Metrics m = info.metrics;
@@ -182,6 +187,7 @@ public class MetricsCollection {
         if (m.inputMetrics != null) {
           hasInputMetrics = true;
           bytesRead += m.inputMetrics.bytesRead;
+          recordsRead += m.inputMetrics.recordsRead;
         }
 
         if (m.shuffleReadMetrics != null) {
@@ -190,17 +196,21 @@ public class MetricsCollection {
           localBlocksFetched += m.shuffleReadMetrics.localBlocksFetched;
           fetchWaitTime += m.shuffleReadMetrics.fetchWaitTime;
           remoteBytesRead += m.shuffleReadMetrics.remoteBytesRead;
+          localBytesRead += m.shuffleReadMetrics.localBytesRead;
+          remoteBytesReadToDisk += m.shuffleReadMetrics.remoteBytesReadToDisk;
+          shuffleRecordsRead += m.shuffleReadMetrics.recordsRead;
         }
 
         if (m.shuffleWriteMetrics != null) {
           shuffleBytesWritten += m.shuffleWriteMetrics.shuffleBytesWritten;
           shuffleWriteTime += m.shuffleWriteMetrics.shuffleWriteTime;
+          shuffleRecordsWritten += m.shuffleWriteMetrics.shuffleRecordsWritten;
         }
       }
 
       InputMetrics inputMetrics = null;
       if (hasInputMetrics) {
-        inputMetrics = new InputMetrics(bytesRead);
+        inputMetrics = new InputMetrics(bytesRead, recordsRead);
       }
 
       ShuffleReadMetrics shuffleReadMetrics = null;
@@ -209,14 +219,18 @@ public class MetricsCollection {
           remoteBlocksFetched,
           localBlocksFetched,
           fetchWaitTime,
-          remoteBytesRead);
+          remoteBytesRead,
+          localBytesRead,
+          remoteBytesReadToDisk,
+          shuffleRecordsRead);
       }
 
       ShuffleWriteMetrics shuffleWriteMetrics = null;
       if (hasShuffleReadMetrics) {
         shuffleWriteMetrics = new ShuffleWriteMetrics(
           shuffleBytesWritten,
-          shuffleWriteTime);
+          shuffleWriteTime,
+          shuffleRecordsWritten);
       }
 
       return new Metrics(

http://git-wip-us.apache.org/repos/asf/hive/blob/d682ca92/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
 
b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
index 6a13071..a162f48 100644
--- 
a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
+++ 
b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
@@ -28,20 +28,26 @@ import 
org.apache.hadoop.hive.common.classification.InterfaceAudience;
  */
 @InterfaceAudience.Private
 public class InputMetrics implements Serializable {
+  /** Total number of bytes read. */
   public final long bytesRead;
+  /** Total number of records read. */
+  public final long recordsRead;
 
   private InputMetrics() {
     // For Serialization only.
-    this(0L);
+    this(0L, 0L);
   }
 
   public InputMetrics(
-      long bytesRead) {
+      long bytesRead,
+      long recordsRead) {
     this.bytesRead = bytesRead;
+    this.recordsRead = recordsRead;
   }
 
   public InputMetrics(TaskMetrics metrics) {
-    this(metrics.inputMetrics().bytesRead());
+    this(metrics.inputMetrics().bytesRead(),
+      metrics.inputMetrics().recordsRead());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/d682ca92/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
 
b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
index e3d564f..ec71136 100644
--- 
a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
+++ 
b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
@@ -42,28 +42,43 @@ public class ShuffleReadMetrics implements Serializable {
   public final long fetchWaitTime;
   /** Total number of remote bytes read from the shuffle by tasks. */
   public final long remoteBytesRead;
+  /** Shuffle data that was read from the local disk (as opposed to from a 
remote executor). */
+  public final long localBytesRead;
+  /** Total number of remotes bytes read to disk from the shuffle by this 
task. */
+  public final long remoteBytesReadToDisk;
+  /** Total number of records read from the shuffle by this task. */
+  public final long recordsRead;
 
   private ShuffleReadMetrics() {
     // For Serialization only.
-    this(0, 0, 0L, 0L);
+    this(0, 0, 0L, 0L, 0L, 0L, 0L);
   }
 
   public ShuffleReadMetrics(
       long remoteBlocksFetched,
       long localBlocksFetched,
       long fetchWaitTime,
-      long remoteBytesRead) {
+      long remoteBytesRead,
+      long localBytesRead,
+      long remoteBytesReadToDisk,
+      long recordsRead) {
     this.remoteBlocksFetched = remoteBlocksFetched;
     this.localBlocksFetched = localBlocksFetched;
     this.fetchWaitTime = fetchWaitTime;
     this.remoteBytesRead = remoteBytesRead;
+    this.localBytesRead = localBytesRead;
+    this.remoteBytesReadToDisk = remoteBytesReadToDisk;
+    this.recordsRead = recordsRead;
   }
 
   public ShuffleReadMetrics(TaskMetrics metrics) {
     this(metrics.shuffleReadMetrics().remoteBlocksFetched(),
       metrics.shuffleReadMetrics().localBlocksFetched(),
       metrics.shuffleReadMetrics().fetchWaitTime(),
-      metrics.shuffleReadMetrics().remoteBytesRead());
+      metrics.shuffleReadMetrics().remoteBytesRead(),
+      metrics.shuffleReadMetrics().localBytesRead(),
+      metrics.shuffleReadMetrics().remoteBytesReadToDisk(),
+      metrics.shuffleReadMetrics().recordsRead());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/d682ca92/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
 
b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
index e9cf6a1..781bf53 100644
--- 
a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
+++ 
b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
@@ -33,22 +33,27 @@ public class ShuffleWriteMetrics implements Serializable {
   public final long shuffleBytesWritten;
   /** Time tasks spent blocking on writes to disk or buffer cache, in 
nanoseconds. */
   public final long shuffleWriteTime;
+  /** Total number of records written to the shuffle by this task. */
+  public final long shuffleRecordsWritten;
 
   private ShuffleWriteMetrics() {
     // For Serialization only.
-    this(0L, 0L);
+    this(0L, 0L, 0L);
   }
 
   public ShuffleWriteMetrics(
       long shuffleBytesWritten,
-      long shuffleWriteTime) {
+      long shuffleWriteTime,
+      long shuffleRecordsWritten) {
     this.shuffleBytesWritten = shuffleBytesWritten;
     this.shuffleWriteTime = shuffleWriteTime;
+    this.shuffleRecordsWritten = shuffleRecordsWritten;
   }
 
   public ShuffleWriteMetrics(TaskMetrics metrics) {
     this(metrics.shuffleWriteMetrics().shuffleBytesWritten(),
-      metrics.shuffleWriteMetrics().shuffleWriteTime());
+      metrics.shuffleWriteMetrics().shuffleWriteTime(),
+      metrics.shuffleWriteMetrics().shuffleRecordsWritten());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/d682ca92/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
 
b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
index c5884cf..2d4c43d 100644
--- 
a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
+++ 
b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
@@ -96,9 +96,9 @@ public class TestMetricsCollection {
     long value = taskValue(1, 1, 1);
 
     Metrics metrics1 = new Metrics(value, value, value, value, value, value, 
value, value, value,
-            value, new InputMetrics(value), null, null);
+            value, new InputMetrics(value, value), null, null);
     Metrics metrics2 = new Metrics(value, value, value, value, value, value, 
value, value, value,
-            value, new InputMetrics(value), null, null);
+            value, new InputMetrics(value, value), null, null);
 
     collection.addMetrics(1, 1, 1, metrics1);
     collection.addMetrics(1, 1, 2, metrics2);
@@ -110,9 +110,9 @@ public class TestMetricsCollection {
   private Metrics makeMetrics(int jobId, int stageId, long taskId) {
     long value = 1000000 * jobId + 1000 * stageId + taskId;
     return new Metrics(value, value, value, value, value, value, value, value, 
value, value,
-      new InputMetrics(value),
-      new ShuffleReadMetrics((int) value, (int) value, value, value),
-      new ShuffleWriteMetrics(value, value));
+      new InputMetrics(value, value),
+      new ShuffleReadMetrics((int) value, (int) value, value, value, value, 
value, value),
+      new ShuffleWriteMetrics(value, value, value));
   }
 
   /**
@@ -160,14 +160,19 @@ public class TestMetricsCollection {
     assertEquals(expected, metrics.taskDurationTime);
 
     assertEquals(expected, metrics.inputMetrics.bytesRead);
+    assertEquals(expected, metrics.inputMetrics.recordsRead);
 
     assertEquals(expected, metrics.shuffleReadMetrics.remoteBlocksFetched);
     assertEquals(expected, metrics.shuffleReadMetrics.localBlocksFetched);
     assertEquals(expected, metrics.shuffleReadMetrics.fetchWaitTime);
     assertEquals(expected, metrics.shuffleReadMetrics.remoteBytesRead);
+    assertEquals(expected, metrics.shuffleReadMetrics.localBytesRead);
+    assertEquals(expected, metrics.shuffleReadMetrics.recordsRead);
+    assertEquals(expected, metrics.shuffleReadMetrics.remoteBytesReadToDisk);
 
     assertEquals(expected, metrics.shuffleWriteMetrics.shuffleBytesWritten);
     assertEquals(expected, metrics.shuffleWriteMetrics.shuffleWriteTime);
+    assertEquals(expected, metrics.shuffleWriteMetrics.shuffleRecordsWritten);
   }
 
 }

Reply via email to