Repository: hive
Updated Branches:
  refs/heads/master 68bdf9eb4 -> 14bb84088


HIVE-19986: Add logging of runtime statistics indicating when Hdfs Erasure 
Coding is used by MR (Andrew Sherman, reviewed by Sahil Takiar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/14bb8408
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/14bb8408
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/14bb8408

Branch: refs/heads/master
Commit: 14bb84088f65467d2ea0cc828a48cd33e3e6666c
Parents: 68bdf9e
Author: Andrew Sherman <asher...@cloudera.com>
Authored: Wed Jul 25 16:23:51 2018 -0500
Committer: Sahil Takiar <stak...@cloudera.com>
Committed: Wed Jul 25 16:44:58 2018 -0500

----------------------------------------------------------------------
 .../jdbc/TestJdbcWithMiniHS2ErasureCoding.java  | 55 ++++++++++++++++++++
 .../org/apache/hadoop/hive/ql/MapRedStats.java  | 43 ++++++++++++++-
 .../hive/ql/exec/mr/HadoopJobExecHelper.java    |  2 +-
 .../hive/ql/processors/ErasureProcessor.java    | 10 +++-
 .../apache/hadoop/hive/shims/Hadoop23Shims.java | 17 ++++++
 .../apache/hadoop/hive/shims/HadoopShims.java   | 10 ++++
 6 files changed, 134 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/14bb8408/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java
 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java
index b0a0145..efb3759 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java
@@ -19,6 +19,8 @@
 package org.apache.hive.jdbc;
 
 import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
 import java.nio.file.Paths;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -31,11 +33,17 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.processors.ErasureProcessor;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsErasureCodingShim;
 import org.apache.hadoop.hive.shims.HadoopShims.MiniDFSShim;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.WriterAppender;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.layout.PatternLayout;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -174,6 +182,53 @@ public class TestJdbcWithMiniHS2ErasureCoding {
   }
 
   /**
+   * Test MR stats.
+   */
+  @Test
+  public void testMapRedStats() throws Exception {
+    // Do log4j magic to save log output
+    StringWriter writer = new StringWriter();
+    Appender appender = addAppender(writer, "testMapRedStats");
+    try (Statement stmt = hs2Conn.createStatement()) {
+      String table = "mapredstats";
+      stmt.execute("set hive.execution.engine=mr");
+      stmt.execute(" CREATE TABLE " + table + " (a int) STORED AS PARQUET");
+      stmt.execute("INSERT INTO TABLE " + table + " VALUES (3)");
+      try (ResultSet rs = stmt.executeQuery("select a from " + table + " order 
by a")) {
+        while (rs.next()) {
+          int val = rs.getInt(1);
+          assertEquals(3, val);
+        }
+      }
+    }
+    String output = writer.toString();
+    // check for standard stats
+    assertTrue(output.contains("HDFS Read:"));
+    assertTrue(output.contains("HDFS Write:"));
+
+    // check for erasure coding stat
+    HadoopShims.HdfsErasureCodingShim erasureShim = 
ErasureProcessor.getErasureShim(conf);
+    if (erasureShim.isMapReduceStatAvailable()) {
+      assertTrue(output.contains("HDFS EC Read:"));
+    }
+  }
+
+  /**
+   * Add an appender to log4j.
+   * 
http://logging.apache.org/log4j/2.x/manual/customconfig.html#AddingToCurrent
+   */
+  private Appender addAppender(final Writer writer, final String writerName) {
+    final LoggerContext context = LoggerContext.getContext(false);
+    final Configuration config = context.getConfiguration();
+    final PatternLayout layout = PatternLayout.createDefaultLayout(config);
+    final Appender appender =
+        WriterAppender.createAppender(layout, null, writer, writerName, false, 
true);
+    appender.start();
+    config.getRootLogger().addAppender(appender, null, null);
+    return appender;
+  }
+
+  /**
    * Add a Erasure Coding Policy to a Path.
    */
   private static void addErasurePolicy(MiniDFSShim dfs, String pathString, 
String policyName) throws IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/14bb8408/ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java 
b/ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java
index 483c3d9..ac45ec4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java
@@ -18,8 +18,15 @@
 
 package org.apache.hadoop.hive.ql;
 
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.processors.ErasureProcessor;
+import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * MapRedStats.
@@ -30,6 +37,9 @@ import org.apache.hadoop.mapred.Counters.Counter;
  *
  */
 public class MapRedStats {
+  private static final String CLASS_NAME = MapRedStats.class.getName();
+  private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+  private JobConf jobConf;
   int numMap;
   int numReduce;
   long cpuMSec;
@@ -40,7 +50,8 @@ public class MapRedStats {
 
   private long numModifiedRows;
 
-  public MapRedStats(int numMap, int numReduce, long cpuMSec, boolean 
ifSuccess, String jobId) {
+  public MapRedStats(JobConf jobConf, int numMap, int numReduce, long cpuMSec, 
boolean ifSuccess, String jobId) {
+    this.jobConf = jobConf;
     this.numMap = numMap;
     this.numReduce = numReduce;
     this.cpuMSec = cpuMSec;
@@ -144,10 +155,40 @@ public class MapRedStats {
       if (hdfsWrittenCntr != null && (hdfsWritten = 
hdfsWrittenCntr.getValue()) >= 0) {
         sb.append(" HDFS Write: " + hdfsWritten);
       }
+
+      HadoopShims.HdfsErasureCodingShim erasureShim = 
getHdfsErasureCodingShim();
+
+      if (erasureShim != null && erasureShim.isMapReduceStatAvailable()) {
+        // Erasure Coding stats - added in HADOOP-15507, expected in Hadoop 
3.2.0
+        Counter hdfsReadEcCntr = counters.findCounter("FileSystemCounters",
+            "HDFS_BYTES_READ_EC"); // FileSystemCounter.BYTES_READ_EC
+        if (hdfsReadEcCntr != null) {
+          long hdfsReadEc = hdfsReadEcCntr.getValue();
+          if (hdfsReadEc >= 0) {
+            sb.append(" HDFS EC Read: " + hdfsReadEc);
+          }
+        }
+      }
     }
 
     sb.append(" " + (success ? "SUCCESS" : "FAIL"));
 
     return sb.toString();
   }
+
+  /**
+   * Get the Erasure Coding Shim.
+   * @return a HdfsErasureCodingShim
+   */
+  private HadoopShims.HdfsErasureCodingShim getHdfsErasureCodingShim() {
+    HadoopShims.HdfsErasureCodingShim erasureShim = null;
+    try {
+      erasureShim = ErasureProcessor.getErasureShim(jobConf);
+    } catch (IOException e) {
+      // this should not happen
+      LOG.warn("Could not get Erasure Coding shim for reason: " + 
e.getMessage());
+      // fall through
+    }
+    return erasureShim;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/14bb8408/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
index c31e22f..eb6cbf7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
@@ -419,7 +419,7 @@ public class HadoopJobExecHelper {
       }
     }
 
-    MapRedStats mapRedStats = new MapRedStats(numMap, numReduce, cpuMsec, 
success, rj.getID().toString());
+    MapRedStats mapRedStats = new MapRedStats(job, numMap, numReduce, cpuMsec, 
success, rj.getID().toString());
     mapRedStats.setCounters(ctrs);
 
     // update based on the final value of the counters

http://git-wip-us.apache.org/repos/asf/hive/blob/14bb8408/ql/src/java/org/apache/hadoop/hive/ql/processors/ErasureProcessor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/processors/ErasureProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/processors/ErasureProcessor.java
index 46114f5..04cc8b0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/processors/ErasureProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/ErasureProcessor.java
@@ -31,6 +31,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -69,9 +70,16 @@ public class ErasureProcessor implements CommandProcessor {
   private HadoopShims.HdfsErasureCodingShim erasureCodingShim;
 
   ErasureProcessor(HiveConf config) throws IOException {
+    this.erasureCodingShim  = getErasureShim(config);
+  }
+
+  /**
+   * Get an instance of HdfsErasureCodingShim from a config.
+   */
+  public static HadoopShims.HdfsErasureCodingShim getErasureShim(Configuration 
config) throws IOException {
     HadoopShims hadoopShims = ShimLoader.getHadoopShims();
     FileSystem fileSystem = FileSystem.get(config);
-    this.erasureCodingShim  = 
hadoopShims.createHdfsErasureCodingShim(fileSystem, config);
+    return hadoopShims.createHdfsErasureCodingShim(fileSystem, config);
   }
 
   private CommandLine parseCommandArgs(final Options opts, String[] args) 
throws ParseException {

http://git-wip-us.apache.org/repos/asf/hive/blob/14bb8408/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
----------------------------------------------------------------------
diff --git 
a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java 
b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index 02490f1..98c3eef 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.shims;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
@@ -78,6 +79,7 @@ import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.WebHCatJTShim23;
 import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
@@ -1607,5 +1609,20 @@ public class Hadoop23Shims extends HadoopShimsSecure {
     public void disableErasureCodingPolicy(String ecPolicyName) throws 
IOException {
       hdfsAdmin.disableErasureCodingPolicy(ecPolicyName);
     }
+
+    /**
+     * @return true if if the runtime MR stat for Erasure Coding is available.
+     */
+    @Override
+    public boolean isMapReduceStatAvailable() {
+      // Look for FileSystemCounter.BYTES_READ_EC, this is present in hadoop 
3.2
+      Field field = null;
+      try {
+        field = FileSystemCounter.class.getField("BYTES_READ_EC");
+      } catch (NoSuchFieldException e) {
+        // This version of Hadoop does not support EC stats for MR
+      }
+      return (field != null);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/14bb8408/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
----------------------------------------------------------------------
diff --git 
a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java 
b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index 2e84ca9..84e6430 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -691,6 +691,11 @@ public interface HadoopShims {
      * @param ecPolicyName the name of the erasure coding policy
      */
     void disableErasureCodingPolicy(String ecPolicyName) throws IOException;
+
+    /**
+     * @return true if if the runtime MR stat for Erasure Coding is available.
+     */
+    boolean isMapReduceStatAvailable();
   }
 
   /**
@@ -728,6 +733,11 @@ public interface HadoopShims {
     public void disableErasureCodingPolicy(String ecPolicyName) throws 
IOException {
     }
 
+    @Override
+    public boolean isMapReduceStatAvailable() {
+      return false;
+    }
+
   }
 
   /**

Reply via email to