Repository: hive
Updated Branches:
  refs/heads/hive-14535 36ad3a405 -> b143f5ce3


HIVE-14990 : run all tests for MM tables and fix the issues that are found - 
issue with FetchOperator (Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b143f5ce
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b143f5ce
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b143f5ce

Branch: refs/heads/hive-14535
Commit: b143f5ce3cf9546ff49eb658a18db43ac319583d
Parents: 36ad3a4
Author: Sergey Shelukhin <[email protected]>
Authored: Wed Oct 26 18:44:30 2016 -0700
Committer: Sergey Shelukhin <[email protected]>
Committed: Wed Oct 26 18:44:30 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/FetchOperator.java      | 31 ++++++++++++++++
 .../hadoop/hive/ql/io/HiveInputFormat.java      | 39 ++++++++++++--------
 2 files changed, 55 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b143f5ce/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
index 7375cd4..f89372c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
@@ -36,7 +36,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIds;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader;
@@ -76,6 +78,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 
 /**
  * FetchTask implementation.
@@ -125,6 +128,7 @@ public class FetchOperator implements Serializable {
 
   private transient StructObjectInspector outputOI;
   private transient Object[] row;
+  private transient Map<String, ValidWriteIds> writeIdMap;
 
   public FetchOperator(FetchWork work, JobConf job) throws HiveException {
     this(work, job, null, null);
@@ -369,6 +373,9 @@ public class FetchOperator implements Serializable {
       Class<? extends InputFormat> formatter = 
currDesc.getInputFileFormatClass();
       Utilities.copyTableJobPropertiesToConf(currDesc.getTableDesc(), job);
       InputFormat inputFormat = getInputFormatFromCache(formatter, job);
+      String inputs = processCurrPathForMmWriteIds(inputFormat);
+      if (inputs == null) return null;
+      job.set("mapred.input.dir", inputs);
 
       InputSplit[] splits = inputFormat.getSplits(job, 1);
       FetchInputFormatSplit[] inputSplits = new 
FetchInputFormatSplit[splits.length];
@@ -385,6 +392,30 @@ public class FetchOperator implements Serializable {
     return null;
   }
 
+  private String processCurrPathForMmWriteIds(InputFormat inputFormat) throws 
IOException {
+    if (inputFormat instanceof HiveInputFormat) {
+      return StringUtils.escapeString(currPath.toString()); // No need to 
process here.
+    }
+    if (writeIdMap == null) {
+      writeIdMap = new HashMap<String, ValidWriteIds>();
+    }
+    // No need to check for MM table - if it is, the IDs should be in the job 
config.
+    ValidWriteIds ids = HiveInputFormat.extractWriteIds(writeIdMap, job, 
currDesc.getTableName());
+    if (ids != null) {
+      Utilities.LOG14535.info("Observing " + currDesc.getTableName() + ": " + 
ids);
+    }
+
+    Path[] dirs = 
HiveInputFormat.processPathsForMmRead(Lists.newArrayList(currPath), job, ids);
+    if (dirs == null || dirs.length == 0) {
+      return null; // No valid inputs. This condition is logged inside the 
call.
+    }
+    StringBuffer str = new 
StringBuffer(StringUtils.escapeString(dirs[0].toString()));
+    for(int i = 1; i < dirs.length;i++) {
+      str.append(",").append(StringUtils.escapeString(dirs[i].toString()));
+    }
+    return str.toString();
+  }
+
   private FetchInputFormatSplit[] splitSampling(SplitSample splitSample,
       FetchInputFormatSplit[] splits) {
     long totalSize = 0;

http://git-wip-us.apache.org/repos/asf/hive/blob/b143f5ce/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index a539799..428093c 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -364,19 +364,11 @@ public class HiveInputFormat<K extends 
WritableComparable, V extends Writable>
       pushFilters(conf, tableScan);
     }
 
-    if (writeIds == null) {
-      FileInputFormat.setInputPaths(conf, dirs.toArray(new Path[dirs.size()]));
-    } else {
-      List<Path> finalPaths = new ArrayList<>(dirs.size());
-      for (Path dir : dirs) {
-        processForWriteIds(dir, conf, writeIds, finalPaths);
-      }
-      if (finalPaths.isEmpty()) {
-        LOG.warn("No valid inputs found in " + dirs);
-        return;
-      }
-      FileInputFormat.setInputPaths(conf, finalPaths.toArray(new 
Path[finalPaths.size()]));
+    Path[] finalDirs = processPathsForMmRead(dirs, conf, writeIds);
+    if (finalDirs == null) {
+      return; // No valid inputs.
     }
+    FileInputFormat.setInputPaths(conf, finalDirs);
     conf.setInputFormat(inputFormat.getClass());
 
     int headerCount = 0;
@@ -396,7 +388,24 @@ public class HiveInputFormat<K extends WritableComparable, 
V extends Writable>
     }
   }
 
-  private void processForWriteIds(Path dir, JobConf conf,
+  public static Path[] processPathsForMmRead(List<Path> dirs, JobConf conf,
+      ValidWriteIds writeIds) throws IOException {
+    if (writeIds == null) {
+      return dirs.toArray(new Path[dirs.size()]);
+    } else {
+      List<Path> finalPaths = new ArrayList<>(dirs.size());
+      for (Path dir : dirs) {
+        processForWriteIds(dir, conf, writeIds, finalPaths);
+      }
+      if (finalPaths.isEmpty()) {
+        LOG.warn("No valid inputs found in " + dirs);
+        return null;
+      }
+      return finalPaths.toArray(new Path[finalPaths.size()]);
+    }
+  }
+
+  private static void processForWriteIds(Path dir, JobConf conf,
       ValidWriteIds writeIds, List<Path> finalPaths) throws IOException {
     FileSystem fs = dir.getFileSystem(conf);
     Utilities.LOG14535.warn("Checking " + dir + " (root) for inputs");
@@ -413,7 +422,7 @@ public class HiveInputFormat<K extends WritableComparable, 
V extends Writable>
     }
   }
 
-  private void handleNonMmDirChild(FileStatus file, ValidWriteIds writeIds,
+  private static void handleNonMmDirChild(FileStatus file, ValidWriteIds 
writeIds,
       LinkedList<Path> subdirs, List<Path> finalPaths) {
     Path path = file.getPath();
     Utilities.LOG14535.warn("Checking " + path + " for inputs");
@@ -561,7 +570,7 @@ public class HiveInputFormat<K extends WritableComparable, 
V extends Writable>
     return result.toArray(new HiveInputSplit[result.size()]);
   }
 
-  private static ValidWriteIds extractWriteIds(Map<String, ValidWriteIds> 
writeIdMap,
+  public static ValidWriteIds extractWriteIds(Map<String, ValidWriteIds> 
writeIdMap,
       JobConf newjob, String tableName) {
     if (StringUtils.isBlank(tableName)) return null;
     ValidWriteIds writeIds = writeIdMap.get(tableName);

Reply via email to