HIVE-19258 : add originals support to MM tables (and make the conversion a 
metadata only operation) (Sergey Shelukhin, reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b6090fac
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b6090fac
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b6090fac

Branch: refs/heads/branch-3
Commit: b6090fac60cb3b4a132a37df6ece94846949764d
Parents: a26f03e
Author: sergey <[email protected]>
Authored: Tue May 22 12:02:46 2018 -0700
Committer: sergey <[email protected]>
Committed: Tue Jun 5 12:17:39 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   4 +
 .../hive/ql/txn/compactor/TestCompactor.java    |  96 +++++-
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |  17 +-
 .../hadoop/hive/ql/exec/FetchOperator.java      |  89 ++++--
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 108 ++++---
 .../hive/ql/io/BucketizedHiveInputFormat.java   |  39 ++-
 .../hadoop/hive/ql/io/HiveInputFormat.java      | 215 +++++++------
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |  43 ++-
 .../ql/parse/repl/dump/io/FileOperations.java   |  81 ++---
 .../ql/plan/ConditionalResolverMergeFiles.java  |   2 +-
 .../hive/ql/txn/compactor/CompactorMR.java      |   9 +-
 .../apache/hadoop/hive/ql/TestTxnCommands.java  |   4 +-
 .../hadoop/hive/ql/TxnCommandsBaseForTests.java |   4 +
 .../queries/clientpositive/mm_conversions.q     |  40 ++-
 .../clientpositive/llap/mm_conversions.q.out    | 307 ++++++++++++++++++-
 15 files changed, 825 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b6090fac/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index ccd7a9e..ee543ab 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2394,6 +2394,10 @@ public class HiveConf extends Configuration {
     HIVE_LOCK_QUERY_STRING_MAX_LENGTH("hive.lock.query.string.max.length", 
1000000,
         "The maximum length of the query string to store in the lock.\n" +
         "The default value is 1000000, since the data limit of a znode is 
1MB"),
+    HIVE_MM_ALLOW_ORIGINALS("hive.mm.allow.originals", false,
+        "Whether to allow original files in MM tables. Conversion to MM may be 
expensive if\n" +
+        "this is set to false, however unless MAPREDUCE-7086 fix is present, 
queries that\n" +
+        "read MM tables with original files will fail. The default in Hive 3.0 
is false."),
 
      // Zookeeper related configs
     HIVE_ZOOKEEPER_QUORUM("hive.zookeeper.quorum", "",

http://git-wip-us.apache.org/repos/asf/hive/blob/b6090fac/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index af42519..46c99d6 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -154,6 +155,7 @@ public class TestCompactor {
     TxnDbUtil.prepDb(hiveConf);
 
     conf = hiveConf;
+    HiveConf.setBoolVar(conf, ConfVars.HIVE_MM_ALLOW_ORIGINALS, true);
     msClient = new HiveMetaStoreClient(conf);
     driver = DriverFactory.newDriver(hiveConf);
     SessionState.start(new CliSessionState(hiveConf));
@@ -984,6 +986,96 @@ public class TestCompactor {
   }
 
   @Test
+  public void mmTableOriginalsOrc() throws Exception {
+    mmTableOriginals("ORC");
+  }
+
+  @Test
+  public void mmTableOriginalsText() throws Exception {
+    mmTableOriginals("TEXTFILE");
+  }
+
+  private void mmTableOriginals(String format) throws Exception {
+    // Originals split won't work due to MAPREDUCE-7086 issue in 
FileInputFormat.
+    boolean isBrokenUntilMapreduce7086 = "TEXTFILE".equals(format);
+    String dbName = "default";
+    String tblName = "mm_nonpart";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) 
STORED AS " +
+        format + " TBLPROPERTIES ('transactional'='false')", driver);
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    Table table = msClient.getTable(dbName, tblName);
+
+    executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 
'foo')", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 
'bar')", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM 
"
+        + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver);
+
+    verifyFooBarResult(tblName, 3);
+
+    FileSystem fs = FileSystem.get(conf);
+    executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES "
+       + "('transactional'='true', 'transactional_properties'='insert_only')", 
driver);
+
+    verifyFooBarResult(tblName, 3);
+
+    runMajorCompaction(dbName, tblName);
+    verifyFooBarResult(tblName, 3);
+    verifyHasBase(table.getSd(), fs, "base_0000001");
+
+    // Try with an extra delta.
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) 
STORED AS " +
+        format + " TBLPROPERTIES ('transactional'='false')", driver);
+    table = msClient.getTable(dbName, tblName);
+
+    executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 
'foo')", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 
'bar')", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM 
"
+        + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver);
+    verifyFooBarResult(tblName, 3);
+
+    executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES "
+       + "('transactional'='true', 'transactional_properties'='insert_only')", 
driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM 
"
+        + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver);
+
+    // Neither select nor compaction (which is a select) wil work after this.
+    if (isBrokenUntilMapreduce7086) return;
+
+    verifyFooBarResult(tblName, 9);
+
+    runMajorCompaction(dbName, tblName);
+    verifyFooBarResult(tblName, 9);
+    verifyHasBase(table.getSd(), fs, "base_0000002");
+
+    // Try with an extra base.
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) 
STORED AS " +
+        format + " TBLPROPERTIES ('transactional'='false')", driver);
+    table = msClient.getTable(dbName, tblName);
+
+    executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 
'foo')", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 
'bar')", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM 
"
+        + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver);
+    verifyFooBarResult(tblName, 3);
+
+    executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES "
+       + "('transactional'='true', 'transactional_properties'='insert_only')", 
driver);
+    executeStatementOnDriver("INSERT OVERWRITE TABLE " + tblName + " SELECT 
a,b FROM "
+        + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver);
+    verifyFooBarResult(tblName, 6);
+
+    runMajorCompaction(dbName, tblName);
+    verifyFooBarResult(tblName, 6);
+    verifyHasBase(table.getSd(), fs, "base_0000002");
+
+    msClient.close();
+  }
+
+
+  @Test
   public void mmTableBucketed() throws Exception {
     String dbName = "default";
     String tblName = "mm_nonpart";
@@ -1054,7 +1146,7 @@ public class TestCompactor {
     msClient.abortTxns(Lists.newArrayList(openTxnId)); // Now abort 3.
     runMajorCompaction(dbName, tblName); // Compact 4 and 5.
     verifyFooBarResult(tblName, 2);
-    verifyHasBase(table.getSd(), fs, "base_0000005"); 
+    verifyHasBase(table.getSd(), fs, "base_0000005");
     runCleaner(conf);
     verifyDeltaCount(table.getSd(), fs, 0);
   }
@@ -1108,7 +1200,7 @@ public class TestCompactor {
         p2 = msClient.getPartition(dbName, tblName, "ds=2"),
         p3 = msClient.getPartition(dbName, tblName, "ds=3");
     msClient.close();
- 
+
     FileSystem fs = FileSystem.get(conf);
     verifyDeltaCount(p1.getSd(), fs, 3);
     verifyDeltaCount(p2.getSd(), fs, 2);

http://git-wip-us.apache.org/repos/asf/hive/blob/b6090fac/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index d4361d5..b7db1a9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -4474,7 +4474,22 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
       Boolean isToMmTable = AcidUtils.isToInsertOnlyTable(tbl, 
alterTbl.getProps());
       if (isToMmTable != null) {
         if (!isFromMmTable && isToMmTable) {
-          result = generateAddMmTasks(tbl, alterTbl.getWriteId());
+          if (!HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_ALLOW_ORIGINALS)) {
+            result = generateAddMmTasks(tbl, alterTbl.getWriteId());
+          } else {
+            if (tbl.getPartitionKeys().size() > 0) {
+              Hive db = getHive();
+              PartitionIterable parts = new PartitionIterable(db, tbl, null,
+                  HiveConf.getIntVar(conf, 
ConfVars.METASTORE_BATCH_RETRIEVE_MAX));
+              Iterator<Partition> partIter = parts.iterator();
+              while (partIter.hasNext()) {
+                Partition part0 = partIter.next();
+                checkMmLb(part0);
+              }
+            } else {
+              checkMmLb(tbl);
+            }
+          }
         } else if (isFromMmTable && !isToMmTable) {
           throw new HiveException("Cannot convert an ACID table to non-ACID");
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/b6090fac/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
index 969c591..2246901 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
@@ -379,34 +379,59 @@ public class FetchOperator implements Serializable {
       Class<? extends InputFormat> formatter = 
currDesc.getInputFileFormatClass();
       Utilities.copyTableJobPropertiesToConf(currDesc.getTableDesc(), job);
       InputFormat inputFormat = getInputFormatFromCache(formatter, job);
-      String inputs = processCurrPathForMmWriteIds(inputFormat);
-      if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
-        Utilities.FILE_OP_LOGGER.trace("Setting fetch inputs to " + inputs);
+      List<Path> dirs = new ArrayList<>(), dirsWithOriginals = new 
ArrayList<>();
+      processCurrPathForMmWriteIds(inputFormat, dirs, dirsWithOriginals);
+      if (dirs.isEmpty() && dirsWithOriginals.isEmpty()) {
+        LOG.debug("No valid directories for " + currPath);
+        continue;
       }
-      if (inputs == null) return null;
-      job.set("mapred.input.dir", inputs);
 
-      InputSplit[] splits = inputFormat.getSplits(job, 1);
-      FetchInputFormatSplit[] inputSplits = new 
FetchInputFormatSplit[splits.length];
-      for (int i = 0; i < splits.length; i++) {
-        inputSplits[i] = new FetchInputFormatSplit(splits[i], inputFormat);
+      List<FetchInputFormatSplit> inputSplits = new ArrayList<>();
+      if (!dirs.isEmpty()) {
+        String inputs = makeInputString(dirs);
+        Utilities.FILE_OP_LOGGER.trace("Setting fetch inputs to {}", inputs);
+        job.set("mapred.input.dir", inputs);
+
+        generateWrappedSplits(inputFormat, inputSplits, job);
+      }
+
+      if (!dirsWithOriginals.isEmpty()) {
+        String inputs = makeInputString(dirsWithOriginals);
+        Utilities.FILE_OP_LOGGER.trace("Setting originals fetch inputs to {}", 
inputs);
+        JobConf jobNoRec = HiveInputFormat.createConfForMmOriginalsSplit(job, 
dirsWithOriginals);
+        jobNoRec.set("mapred.input.dir", inputs);
+        generateWrappedSplits(inputFormat, inputSplits, jobNoRec);
       }
+
       if (work.getSplitSample() != null) {
         inputSplits = splitSampling(work.getSplitSample(), inputSplits);
       }
-      if (inputSplits.length > 0) {
-        if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_IN_TEST)) {
-          Arrays.sort(inputSplits, new FetchInputFormatSplitComparator());
-        }
-        return inputSplits;
+
+      if (inputSplits.isEmpty()) {
+        LOG.debug("No splits for " + currPath);
+        continue;
+      }
+      if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_IN_TEST)) {
+        Collections.sort(inputSplits, new FetchInputFormatSplitComparator());
       }
+      return inputSplits.toArray(new 
FetchInputFormatSplit[inputSplits.size()]);
     }
+
     return null;
   }
 
-  private String processCurrPathForMmWriteIds(InputFormat inputFormat) throws 
IOException {
+  private void generateWrappedSplits(InputFormat inputFormat,
+      List<FetchInputFormatSplit> inputSplits, JobConf job) throws IOException 
{
+    InputSplit[] splits = inputFormat.getSplits(job, 1);
+    for (int i = 0; i < splits.length; i++) {
+      inputSplits.add(new FetchInputFormatSplit(splits[i], inputFormat));
+    }
+  }
+
+  private void processCurrPathForMmWriteIds(InputFormat inputFormat,
+      List<Path> dirs, List<Path> dirsWithOriginals) throws IOException {
     if (inputFormat instanceof HiveInputFormat) {
-      return StringUtils.escapeString(currPath.toString()); // No need to 
process here.
+      dirs.add(currPath); // No need to process here.
     }
     ValidWriteIdList validWriteIdList;
     if (AcidUtils.isInsertOnlyTable(currDesc.getTableDesc().getProperties())) {
@@ -418,17 +443,19 @@ public class FetchOperator implements Serializable {
       Utilities.FILE_OP_LOGGER.info("Processing " + currDesc.getTableName() + 
" for MM paths");
     }
 
-    Path[] dirs = 
HiveInputFormat.processPathsForMmRead(Lists.newArrayList(currPath), job, 
validWriteIdList);
-    if (dirs == null || dirs.length == 0) {
-      return null; // No valid inputs. This condition is logged inside the 
call.
-    }
-    StringBuffer str = new 
StringBuffer(StringUtils.escapeString(dirs[0].toString()));
-    for(int i = 1; i < dirs.length;i++) {
-      str.append(",").append(StringUtils.escapeString(dirs[i].toString()));
+    HiveInputFormat.processPathsForMmRead(
+        Lists.newArrayList(currPath), job, validWriteIdList, dirs, 
dirsWithOriginals);
+  }
+
+  private String makeInputString(List<Path> dirs) {
+    if (dirs == null || dirs.isEmpty()) return "";
+    StringBuffer str = new 
StringBuffer(StringUtils.escapeString(dirs.get(0).toString()));
+    for(int i = 1; i < dirs.size(); i++) {
+      str.append(",").append(StringUtils.escapeString(dirs.get(i).toString()));
     }
     return str.toString();
-  }
 
+  }
   private ValidWriteIdList extractValidWriteIdList() {
     if (currDesc.getTableName() == null || 
!org.apache.commons.lang.StringUtils.isBlank(currDesc.getTableName())) {
       String txnString = job.get(ValidWriteIdList.VALID_WRITEIDS_KEY);
@@ -438,18 +465,18 @@ public class FetchOperator implements Serializable {
     return null;  // not fetching from a table directly but from a temp 
location
   }
 
-  private FetchInputFormatSplit[] splitSampling(SplitSample splitSample,
-      FetchInputFormatSplit[] splits) {
+  private List<FetchInputFormatSplit> splitSampling(SplitSample splitSample,
+      List<FetchInputFormatSplit> splits) {
     long totalSize = 0;
     for (FetchInputFormatSplit split: splits) {
         totalSize += split.getLength();
     }
-    List<FetchInputFormatSplit> result = new 
ArrayList<FetchInputFormatSplit>(splits.length);
+    List<FetchInputFormatSplit> result = new 
ArrayList<FetchInputFormatSplit>(splits.size());
     long targetSize = splitSample.getTargetSize(totalSize);
-    int startIndex = splitSample.getSeedNum() % splits.length;
+    int startIndex = splitSample.getSeedNum() % splits.size();
     long size = 0;
-    for (int i = 0; i < splits.length; i++) {
-      FetchInputFormatSplit split = splits[(startIndex + i) % splits.length];
+    for (int i = 0; i < splits.size(); i++) {
+      FetchInputFormatSplit split = splits.get((startIndex + i) % 
splits.size());
       result.add(split);
       long splitgLength = split.getLength();
       if (size + splitgLength >= targetSize) {
@@ -460,7 +487,7 @@ public class FetchOperator implements Serializable {
       }
       size += splitgLength;
     }
-    return result.toArray(new FetchInputFormatSplit[result.size()]);
+    return result;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/b6090fac/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 51a793f..7fce67f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
@@ -149,6 +150,7 @@ public class AcidUtils {
   public static final int MAX_STATEMENTS_PER_TXN = 10000;
   public static final Pattern BUCKET_DIGIT_PATTERN = 
Pattern.compile("[0-9]{5}$");
   public static final Pattern   LEGACY_BUCKET_DIGIT_PATTERN = 
Pattern.compile("^[0-9]{6}");
+
   /**
    * A write into a non-aicd table produces files like 0000_0 or 0000_0_copy_1
    * (Unless via Load Data statement)
@@ -390,6 +392,57 @@ public class AcidUtils {
     }
     return result;
   }
+
+  public static final class DirectoryImpl implements Directory {
+    private final List<FileStatus> abortedDirectories;
+    private final boolean isBaseInRawFormat;
+    private final List<HdfsFileStatusWithId> original;
+    private final List<FileStatus> obsolete;
+    private final List<ParsedDelta> deltas;
+    private final Path base;
+
+    public DirectoryImpl(List<FileStatus> abortedDirectories,
+        boolean isBaseInRawFormat, List<HdfsFileStatusWithId> original,
+        List<FileStatus> obsolete, List<ParsedDelta> deltas, Path base) {
+      this.abortedDirectories = abortedDirectories;
+      this.isBaseInRawFormat = isBaseInRawFormat;
+      this.original = original;
+      this.obsolete = obsolete;
+      this.deltas = deltas;
+      this.base = base;
+    }
+
+    @Override
+    public Path getBaseDirectory() {
+      return base;
+    }
+
+    @Override
+    public boolean isBaseInRawFormat() {
+      return isBaseInRawFormat;
+    }
+
+    @Override
+    public List<HdfsFileStatusWithId> getOriginalFiles() {
+      return original;
+    }
+
+    @Override
+    public List<ParsedDelta> getCurrentDirectories() {
+      return deltas;
+    }
+
+    @Override
+    public List<FileStatus> getObsolete() {
+      return obsolete;
+    }
+
+    @Override
+    public List<FileStatus> getAbortedDirectories() {
+      return abortedDirectories;
+    }
+  }
+
   //This is used for (full) Acid tables.  InsertOnly use NOT_ACID
   public enum Operation implements Serializable {
     NOT_ACID, INSERT, UPDATE, DELETE;
@@ -984,7 +1037,7 @@ public class AcidUtils {
       // Okay, we're going to need these originals.  Recurse through them and 
figure out what we
       // really need.
       for (FileStatus origDir : originalDirectories) {
-        findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles);
+        findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles, 
true);
       }
     }
 
@@ -1056,7 +1109,7 @@ public class AcidUtils {
      * If this sort order is changed and there are tables that have been 
converted to transactional
      * and have had any update/delete/merge operations performed but not yet 
MAJOR compacted, it
      * may result in data loss since it may change how
-     * {@link 
org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair} assigns 
+     * {@link 
org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair} assigns
      * {@link RecordIdentifier#rowId} for read (that have happened) and 
compaction (yet to happen).
      */
     Collections.sort(original, (HdfsFileStatusWithId o1, HdfsFileStatusWithId 
o2) -> {
@@ -1066,37 +1119,8 @@ public class AcidUtils {
 
     // Note: isRawFormat is invalid for non-ORC tables. It will always return 
true, so we're good.
     final boolean isBaseInRawFormat = base != null && 
MetaDataFile.isRawFormat(base, fs);
-    return new Directory() {
-
-      @Override
-      public Path getBaseDirectory() {
-        return base;
-      }
-      @Override
-      public boolean isBaseInRawFormat() {
-        return isBaseInRawFormat;
-      }
-
-      @Override
-      public List<HdfsFileStatusWithId> getOriginalFiles() {
-        return original;
-      }
-
-      @Override
-      public List<ParsedDelta> getCurrentDirectories() {
-        return deltas;
-      }
-
-      @Override
-      public List<FileStatus> getObsolete() {
-        return obsolete;
-      }
-
-      @Override
-      public List<FileStatus> getAbortedDirectories() {
-        return abortedDirectories;
-      }
-    };
+    return new DirectoryImpl(abortedDirectories, isBaseInRawFormat, original,
+        obsolete, deltas, base);
   }
   /**
    * We can only use a 'base' if it doesn't have an open txn (from specific 
reader's point of view)
@@ -1208,8 +1232,9 @@ public class AcidUtils {
    * @param original the list of original files
    * @throws IOException
    */
-  private static void findOriginals(FileSystem fs, FileStatus stat,
-      List<HdfsFileStatusWithId> original, Ref<Boolean> useFileIds, boolean 
ignoreEmptyFiles) throws IOException {
+  public static void findOriginals(FileSystem fs, FileStatus stat,
+      List<HdfsFileStatusWithId> original, Ref<Boolean> useFileIds,
+      boolean ignoreEmptyFiles, boolean recursive) throws IOException {
     assert stat.isDir();
     List<HdfsFileStatusWithId> childrenWithId = null;
     Boolean val = useFileIds.value;
@@ -1228,8 +1253,10 @@ public class AcidUtils {
     }
     if (childrenWithId != null) {
       for (HdfsFileStatusWithId child : childrenWithId) {
-        if (child.getFileStatus().isDir()) {
-          findOriginals(fs, child.getFileStatus(), original, useFileIds, 
ignoreEmptyFiles);
+        if (child.getFileStatus().isDirectory()) {
+          if (recursive) {
+            findOriginals(fs, child.getFileStatus(), original, useFileIds, 
ignoreEmptyFiles, true);
+          }
         } else {
           if(!ignoreEmptyFiles || child.getFileStatus().getLen() > 0) {
             original.add(child);
@@ -1240,7 +1267,9 @@ public class AcidUtils {
       List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, 
stat.getPath(), hiddenFileFilter);
       for (FileStatus child : children) {
         if (child.isDir()) {
-          findOriginals(fs, child, original, useFileIds, ignoreEmptyFiles);
+          if (recursive) {
+            findOriginals(fs, child, original, useFileIds, ignoreEmptyFiles, 
true);
+          }
         } else {
           if(!ignoreEmptyFiles || child.getLen() > 0) {
             original.add(createOriginalObj(null, child));
@@ -1250,6 +1279,7 @@ public class AcidUtils {
     }
   }
 
+
   public static boolean isTablePropertyTransactional(Properties props) {
     String resultStr = 
props.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
     if (resultStr == null) {
@@ -1817,7 +1847,7 @@ public class AcidUtils {
       return null;
     }
     Directory acidInfo = AcidUtils.getAcidState(dir, jc, idList);
-    // Assume that for an MM table, or if there's only the base directory, we 
are good. 
+    // Assume that for an MM table, or if there's only the base directory, we 
are good.
     if (!acidInfo.getCurrentDirectories().isEmpty() && 
AcidUtils.isFullAcidTable(table)) {
       Utilities.FILE_OP_LOGGER.warn(
           "Computing stats for an ACID table; stats may be inaccurate");

http://git-wip-us.apache.org/repos/asf/hive/blob/b6090fac/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
index 75fa09d..5d20931 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
@@ -18,21 +18,16 @@
 
 package org.apache.hadoop.hive.ql.io;
 
-import org.apache.curator.shaded.com.google.common.collect.Lists;
-
-import org.apache.hadoop.hive.common.ValidWriteIdList;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileInputFormat;
@@ -42,6 +37,10 @@ import org.apache.hadoop.mapred.InvalidInputException;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
 
 /**
  * BucketizedHiveInputFormat serves the similar function as hiveInputFormat but
@@ -139,21 +138,39 @@ public class BucketizedHiveInputFormat<K extends 
WritableComparable, V extends W
         mmIds = getMmValidWriteIds(newjob, part.getTableDesc(), null);
       }
       // TODO: should this also handle ACID operation, etc.? seems to miss a 
lot of stuff from HIF.
-      Path[] finalDirs = (mmIds == null) ? new Path[] { dir }
-        : processPathsForMmRead(Lists.newArrayList(dir), newjob, mmIds);
-      if (finalDirs == null) {
+      List<Path> finalDirs = null, dirsWithMmOriginals = null;
+      if (mmIds == null) {
+        finalDirs = Lists.newArrayList(dir);
+      } else {
+        finalDirs = new ArrayList<>();
+        dirsWithMmOriginals = new ArrayList<>();
+        processPathsForMmRead(
+            Lists.newArrayList(dir), newjob, mmIds, finalDirs, 
dirsWithMmOriginals);
+      }
+      if (finalDirs.isEmpty() && (dirsWithMmOriginals == null || 
dirsWithMmOriginals.isEmpty())) {
         continue; // No valid inputs - possible in MM case.
       }
 
       for (Path finalDir : finalDirs) {
         FileStatus[] listStatus = listStatus(newjob, finalDir);
-
         for (FileStatus status : listStatus) {
           numOrigSplits = addBHISplit(
               status, inputFormat, inputFormatClass, numOrigSplits, newjob, 
result);
         }
       }
+      if (dirsWithMmOriginals != null) {
+        for (Path originalsDir : dirsWithMmOriginals) {
+          FileSystem fs = originalsDir.getFileSystem(job);
+          FileStatus[] listStatus = fs.listStatus(dir, 
FileUtils.HIDDEN_FILES_PATH_FILTER);
+          for (FileStatus status : listStatus) {
+            if (status.isDirectory()) continue;
+            numOrigSplits = addBHISplit(
+                status, inputFormat, inputFormatClass, numOrigSplits, newjob, 
result);
+          }
+        }
+      }
     }
+
     LOG.info(result.size() + " bucketized splits generated from "
         + numOrigSplits + " original splits.");
     return result.toArray(new BucketizedHiveInputSplit[result.size()]);

http://git-wip-us.apache.org/repos/asf/hive/blob/b6090fac/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 3d965c0..bcc0508 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -29,35 +29,30 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.common.StringInternUtils;
-import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
-import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
-import org.apache.hive.common.util.Ref;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.StringInternUtils;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
 import org.apache.hadoop.hive.llap.io.api.LlapIo;
 import org.apache.hadoop.hive.llap.io.api.LlapProxy;
-import org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -65,6 +60,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc;
 import 
org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorMapOperatorReadType;
@@ -82,7 +78,10 @@ import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.Ref;
 import org.apache.hive.common.util.ReflectionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * HiveInputFormat is a parameterized InputFormat which looks at the path name
@@ -488,45 +487,72 @@ public class HiveInputFormat<K extends 
WritableComparable, V extends Writable>
       pushFilters(conf, tableScan, this.mrwork);
     }
 
-    Path[] finalDirs = processPathsForMmRead(dirs, conf, validMmWriteIdList);
-    if (finalDirs == null) {
+    List<Path> dirsWithFileOriginals = new ArrayList<>(), finalDirs = new 
ArrayList<>();
+    processPathsForMmRead(dirs, conf, validMmWriteIdList, finalDirs, 
dirsWithFileOriginals);
+    if (finalDirs.isEmpty() && dirsWithFileOriginals.isEmpty()) {
       // This is for transactional tables.
       if (!conf.getBoolean(Utilities.ENSURE_OPERATORS_EXECUTED, false)) {
         LOG.warn("No valid inputs found in " + dirs);
-        return; // No valid inputs.
       } else if (validMmWriteIdList != null) {
         // AcidUtils.getAcidState() is already called to verify there is no 
input split.
         // Thus for a GroupByOperator summary row, set finalDirs and add a 
Dummy split here.
-        finalDirs = dirs.toArray(new Path[dirs.size()]);
-        result.add(new HiveInputSplit(new 
NullRowsInputFormat.DummyInputSplit(finalDirs[0].toString()),
-            ZeroRowsInputFormat.class.getName()));
+        result.add(new HiveInputSplit(new NullRowsInputFormat.DummyInputSplit(
+            dirs.get(0).toString()), ZeroRowsInputFormat.class.getName()));
       }
-    } else {
-      FileInputFormat.setInputPaths(conf, finalDirs);
-      conf.setInputFormat(inputFormat.getClass());
-
-      int headerCount = 0;
-      int footerCount = 0;
-      if (table != null) {
-        headerCount = Utilities.getHeaderCount(table);
-        footerCount = Utilities.getFooterCount(table, conf);
-        if (headerCount != 0 || footerCount != 0) {
-          // Input file has header or footer, cannot be splitted.
-          HiveConf.setLongVar(conf, ConfVars.MAPREDMINSPLITSIZE, 
Long.MAX_VALUE);
-        }
+      return; // No valid inputs.
+    }
+
+    conf.setInputFormat(inputFormat.getClass());
+    int headerCount = 0;
+    int footerCount = 0;
+    if (table != null) {
+      headerCount = Utilities.getHeaderCount(table);
+      footerCount = Utilities.getFooterCount(table, conf);
+      if (headerCount != 0 || footerCount != 0) {
+        // Input file has header or footer, cannot be splitted.
+        HiveConf.setLongVar(conf, ConfVars.MAPREDMINSPLITSIZE, Long.MAX_VALUE);
       }
+    }
 
+    if (!finalDirs.isEmpty()) {
+      FileInputFormat.setInputPaths(conf, finalDirs.toArray(new 
Path[finalDirs.size()]));
       InputSplit[] iss = inputFormat.getSplits(conf, splits);
       for (InputSplit is : iss) {
         result.add(new HiveInputSplit(is, inputFormatClass.getName()));
       }
-      if (iss.length == 0 && finalDirs.length > 0 && 
conf.getBoolean(Utilities.ENSURE_OPERATORS_EXECUTED, false)) {
-        // If there are no inputs; the Execution engine skips the operator 
tree.
-        // To prevent it from happening; an opaque  ZeroRows input is added 
here - when needed.
-        result.add(new HiveInputSplit(new 
NullRowsInputFormat.DummyInputSplit(finalDirs[0].toString()),
-                ZeroRowsInputFormat.class.getName()));
+    }
+
+    if (!dirsWithFileOriginals.isEmpty()) {
+      // We are going to add splits for these directories with recursive = 
false, so we ignore
+      // any subdirectories (deltas or original directories) and only read the 
original files.
+      // The fact that there's a loop calling addSplitsForGroup already 
implies it's ok to
+      // the real input format multiple times... however some split 
concurrency/etc configs
+      // that are applied separately in each call will effectively be ignored 
for such splits.
+      JobConf nonRecConf = createConfForMmOriginalsSplit(conf, 
dirsWithFileOriginals);
+      InputSplit[] iss = inputFormat.getSplits(nonRecConf, splits);
+      for (InputSplit is : iss) {
+        result.add(new HiveInputSplit(is, inputFormatClass.getName()));
       }
     }
+
+    if (result.isEmpty() && 
conf.getBoolean(Utilities.ENSURE_OPERATORS_EXECUTED, false)) {
+      // If there are no inputs; the Execution engine skips the operator tree.
+      // To prevent it from happening; an opaque  ZeroRows input is added here 
- when needed.
+      result.add(new HiveInputSplit(new NullRowsInputFormat.DummyInputSplit(
+          finalDirs.get(0).toString()), ZeroRowsInputFormat.class.getName()));
+    }
+  }
+
+  public static JobConf createConfForMmOriginalsSplit(
+      JobConf conf, List<Path> dirsWithFileOriginals) {
+    JobConf nonRecConf = new JobConf(conf);
+    FileInputFormat.setInputPaths(nonRecConf,
+        dirsWithFileOriginals.toArray(new Path[dirsWithFileOriginals.size()]));
+    nonRecConf.setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false);
+    nonRecConf.setBoolean("mapred.input.dir.recursive", false);
+    // TODO: change to FileInputFormat.... field after MAPREDUCE-7086.
+    
nonRecConf.setBoolean("mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirs",
 true);
+    return nonRecConf;
   }
 
   protected ValidWriteIdList getMmValidWriteIds(
@@ -543,71 +569,84 @@ public class HiveInputFormat<K extends 
WritableComparable, V extends Writable>
     return validWriteIdList;
   }
 
-  public static Path[] processPathsForMmRead(List<Path> dirs, JobConf conf,
-      ValidWriteIdList validWriteIdList) throws IOException {
-     if (validWriteIdList == null) {
-      return dirs.toArray(new Path[dirs.size()]);
-    } else {
-      List<Path> finalPaths = new ArrayList<>(dirs.size());
-      for (Path dir : dirs) {
-        processForWriteIds(dir, conf, validWriteIdList, finalPaths);
-      }
-      if (finalPaths.isEmpty()) {
-        return null;
-      }
-      return finalPaths.toArray(new Path[finalPaths.size()]);
+  public static void processPathsForMmRead(List<Path> dirs, Configuration conf,
+      ValidWriteIdList validWriteIdList, List<Path> finalPaths,
+      List<Path> pathsWithFileOriginals) throws IOException {
+    if (validWriteIdList == null) {
+      finalPaths.addAll(dirs);
+      return;
+    }
+    boolean allowOriginals = HiveConf.getBoolVar(conf, 
ConfVars.HIVE_MM_ALLOW_ORIGINALS);
+    for (Path dir : dirs) {
+      processForWriteIds(
+          dir, conf, validWriteIdList, allowOriginals, finalPaths, 
pathsWithFileOriginals);
     }
   }
 
-  private static void processForWriteIds(Path dir, JobConf conf,
-      ValidWriteIdList validWriteIdList, List<Path> finalPaths) throws 
IOException {
+  private static void processForWriteIds(Path dir, Configuration conf,
+      ValidWriteIdList validWriteIdList, boolean allowOriginals, List<Path> 
finalPaths,
+      List<Path> pathsWithFileOriginals) throws IOException {
     FileSystem fs = dir.getFileSystem(conf);
-    if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
-      Utilities.FILE_OP_LOGGER.trace("Checking " + dir + " (root) for inputs");
-    }
+    Utilities.FILE_OP_LOGGER.trace("Checking {} for inputs", dir);
+
     // Ignore nullscan-optimized paths.
     if (fs instanceof NullScanFileSystem) {
       finalPaths.add(dir);
       return;
     }
 
-    // Tez require the use of recursive input dirs for union processing, so we 
have to look into the
-    // directory to find out
-    LinkedList<Path> subdirs = new LinkedList<>();
-    subdirs.add(dir); // add itself as a starting point
-    while (!subdirs.isEmpty()) {
-      Path currDir = subdirs.poll();
-      FileStatus[] files = fs.listStatus(currDir);
-      boolean hadAcidState = false;   // whether getAcidState has been called 
for currDir
-      for (FileStatus file : files) {
-        Path path = file.getPath();
-        if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
-          Utilities.FILE_OP_LOGGER.trace("Checking " + path + " for inputs");
+    // We need to iterate to detect original directories, that are supported 
in MM but not ACID.
+    boolean hasOriginalFiles = false, hasAcidDirs = false;
+    List<Path> originalDirectories = new ArrayList<>();
+    for (FileStatus file : fs.listStatus(dir, AcidUtils.hiddenFileFilter)) {
+      Path currDir = file.getPath();
+      Utilities.FILE_OP_LOGGER.trace("Checking {} for being an input", 
currDir);
+      if (!file.isDirectory()) {
+        hasOriginalFiles = true;
+      } else if (AcidUtils.extractWriteId(currDir) == null) {
+        if (allowOriginals) {
+          originalDirectories.add(currDir); // Add as is; it would become a 
recursive split.
+        } else {
+          Utilities.FILE_OP_LOGGER.debug("Ignoring unknown (original?) 
directory {}", currDir);
         }
-        if (!file.isDirectory()) {
-          Utilities.FILE_OP_LOGGER.warn("Ignoring a file not in MM directory " 
+ path);
-        } else if (AcidUtils.extractWriteId(path) == null) {
-          subdirs.add(path);
-        } else if (!hadAcidState) {
-          AcidUtils.Directory dirInfo
-                  = AcidUtils.getAcidState(currDir, conf, validWriteIdList, 
Ref.from(false), true, null);
-          hadAcidState = true;
-
-          // Find the base, created for IOW.
-          Path base = dirInfo.getBaseDirectory();
-          if (base != null) {
-            finalPaths.add(base);
-          }
+      } else {
+        hasAcidDirs = true;
+      }
+    }
+    if (hasAcidDirs) {
+      AcidUtils.Directory dirInfo = AcidUtils.getAcidState(
+          dir, conf, validWriteIdList, Ref.from(false), true, null);
 
-          // Find the parsed delta files.
-          for (AcidUtils.ParsedDelta delta : dirInfo.getCurrentDirectories()) {
-            Utilities.FILE_OP_LOGGER.debug("Adding input " + delta.getPath());
-            finalPaths.add(delta.getPath());
-          }
-        }
+      // Find the base, created for IOW.
+      Path base = dirInfo.getBaseDirectory();
+      if (base != null) {
+        Utilities.FILE_OP_LOGGER.debug("Adding input {}", base);
+        finalPaths.add(base);
+        // Base means originals no longer matter.
+        originalDirectories.clear();
+        hasOriginalFiles = false;
+      }
+
+      // Find the parsed delta files.
+      for (AcidUtils.ParsedDelta delta : dirInfo.getCurrentDirectories()) {
+        Utilities.FILE_OP_LOGGER.debug("Adding input {}", delta.getPath());
+        finalPaths.add(delta.getPath());
+      }
+    }
+    if (!originalDirectories.isEmpty()) {
+      Utilities.FILE_OP_LOGGER.debug("Adding original directories {}", 
originalDirectories);
+      finalPaths.addAll(originalDirectories);
+    }
+    if (hasOriginalFiles) {
+      if (allowOriginals) {
+        Utilities.FILE_OP_LOGGER.debug("Directory has original files {}", dir);
+        pathsWithFileOriginals.add(dir);
+      } else {
+        Utilities.FILE_OP_LOGGER.debug("Ignoring unknown (original?) files in 
{}", dir);
       }
     }
   }
+ 
 
   Path[] getInputPaths(JobConf job) throws IOException {
     Path[] dirs;
@@ -719,7 +758,7 @@ public class HiveInputFormat<K extends WritableComparable, 
V extends Writable>
       pushProjection(newjob, readColumnsBuffer, readColumnNamesBuffer);
     }
 
-    if (dirs.length != 0) {
+    if (dirs.length != 0) { // TODO: should this be currentDirs?
       if (LOG.isInfoEnabled()) {
         LOG.info("Generating splits for dirs: {}", dirs);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/b6090fac/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index d58b82e..694cf75 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -105,6 +105,7 @@ import 
org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
@@ -117,6 +118,7 @@ import org.apache.orc.ColumnStatistics;
 import org.apache.orc.FileFormatException;
 import org.apache.orc.OrcProto;
 import org.apache.orc.OrcProto.Footer;
+import org.apache.orc.OrcProto.Type;
 import org.apache.orc.OrcUtils;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.StripeStatistics;
@@ -342,7 +344,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     return false;
   }
 
-  
+
   public static boolean[] genIncludedColumns(TypeDescription readerSchema,
                                              List<Integer> included) {
     return genIncludedColumns(readerSchema, included, null);
@@ -701,15 +703,15 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
       // & therefore we should be able to retrieve them here and determine 
appropriate behavior.
       // Note that this will be meaningless for non-acid tables & will be set 
to null.
       //this is set by Utilities.copyTablePropertiesToConf()
-      boolean isTableTransactional = 
conf.getBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, false);
-      String transactionalProperties = 
conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
-      this.acidOperationalProperties = isTableTransactional ?
-          AcidOperationalProperties.parseString(transactionalProperties) : 
null;
+      boolean isTxnTable = 
conf.getBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, false);
+      String txnProperties = 
conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
+      this.acidOperationalProperties = isTxnTable
+          ? AcidOperationalProperties.parseString(txnProperties) : null;
 
       String value = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY);
       writeIdList = value == null ? new ValidReaderWriteIdList() : new 
ValidReaderWriteIdList(value);
       LOG.debug("Context:: Read ValidWriteIdList: " + writeIdList.toString()
-              + " isTransactionalTable: " + isTableTransactional);
+              + " isTransactionalTable: " + isTxnTable + " properties: " + 
txnProperties);
     }
 
     @VisibleForTesting
@@ -1144,6 +1146,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     private final Ref<Boolean> useFileIds;
     private final UserGroupInformation ugi;
 
+    @VisibleForTesting
     FileGenerator(Context context, FileSystem fs, Path dir, boolean useFileIds,
         UserGroupInformation ugi) {
       this(context, fs, dir, Ref.from(useFileIds), ugi);
@@ -1176,13 +1179,31 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     }
 
     private AcidDirInfo callInternal() throws IOException {
+      if (context.acidOperationalProperties != null
+          && context.acidOperationalProperties.isInsertOnly()) {
+        // See the class comment - HIF handles MM for all input formats, so if 
we try to handle it
+        // again, in particular for the non-recursive originals-only getSplits 
call, we will just
+        // get confused. This bypass was not necessary when MM tables didn't 
support originals. Now
+        // that they do, we use this path for anything MM table related, 
although everything except
+        // the originals could still be handled by AcidUtils like a regular 
non-txn table.
+        boolean isRecursive = 
context.conf.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE,
+            context.conf.getBoolean("mapred.input.dir.recursive", false));
+        List<HdfsFileStatusWithId> originals = new ArrayList<>();
+        List<AcidBaseFileInfo> baseFiles = new ArrayList<>();
+        AcidUtils.findOriginals(fs, fs.getFileStatus(dir), originals, 
useFileIds, true, isRecursive);
+        for (HdfsFileStatusWithId fileId : originals) {
+          baseFiles.add(new AcidBaseFileInfo(fileId, 
AcidUtils.AcidBaseFileType.ORIGINAL_BASE));
+        }
+        return new AcidDirInfo(fs, dir, new 
AcidUtils.DirectoryImpl(Lists.newArrayList(), true, originals,
+            Lists.newArrayList(), Lists.newArrayList(), null), baseFiles, new 
ArrayList<>());
+      }
       //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine?
-      AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf,
-          context.writeIdList, useFileIds, true, null);
+      AcidUtils.Directory dirInfo = AcidUtils.getAcidState(
+          dir, context.conf, context.writeIdList, useFileIds, true, null);
       // find the base files (original or new style)
       List<AcidBaseFileInfo> baseFiles = new ArrayList<>();
       if (dirInfo.getBaseDirectory() == null) {
-        //for non-acid tables, all data files are in getOriginalFiles() list
+        // For non-acid tables (or paths), all data files are in 
getOriginalFiles() list
         for (HdfsFileStatusWithId fileId : dirInfo.getOriginalFiles()) {
           baseFiles.add(new AcidBaseFileInfo(fileId, 
AcidUtils.AcidBaseFileType.ORIGINAL_BASE));
         }
@@ -1197,7 +1218,6 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
       // Find the parsed deltas- some of them containing only the insert delta 
events
       // may get treated as base if split-update is enabled for ACID. (See 
HIVE-14035 for details)
       List<ParsedDelta> parsedDeltas = new ArrayList<>();
-
       if (context.acidOperationalProperties != null &&
           context.acidOperationalProperties.isSplitUpdate()) {
         // If we have split-update turned on for this table, then the delta 
events have already been
@@ -1258,7 +1278,8 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
         We already handled all delete deltas above and there should not be any 
other deltas for
         any table type.  (this was acid 1.0 code path).
          */
-        assert dirInfo.getCurrentDirectories().isEmpty() : "Non empty curDir 
list?!: " + dir;
+        assert dirInfo.getCurrentDirectories().isEmpty() :
+            "Non empty curDir list?!: " + dirInfo.getCurrentDirectories();
         // When split-update is not enabled, then all the deltas in the 
current directories
         // should be considered as usual.
         parsedDeltas.addAll(dirInfo.getCurrentDirectories());

http://git-wip-us.apache.org/repos/asf/hive/blob/b6090fac/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
index b3e76b6..085f4a1 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
@@ -17,6 +17,16 @@
  */
 package org.apache.hadoop.hive.ql.parse.repl.dump.io;
 
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -25,7 +35,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
@@ -35,14 +45,6 @@ import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.security.auth.login.LoginException;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.ArrayList;
-import java.util.List;
-
 //TODO: this object is created once to call one method and then immediately 
destroyed.
 //So it's basically just a roundabout way to pass arguments to a static 
method. Simplify?
 public class FileOperations {
@@ -101,45 +103,48 @@ public class FileOperations {
   }
 
   private void copyMmPath() throws LoginException, IOException {
-    assert dataPathList.size() == 1;
     ValidWriteIdList ids = AcidUtils.getTableValidWriteIdList(hiveConf, 
mmCtx.getFqTableName());
-    Path fromPath = dataFileSystem.makeQualified(dataPathList.get(0));
-    List<Path> validPaths = getMmValidPaths(ids, fromPath);
-    String fromPathStr = fromPath.toString();
-    if (!fromPathStr.endsWith(Path.SEPARATOR)) {
-       fromPathStr += Path.SEPARATOR;
-    }
-    for (Path validPath : validPaths) {
-      // Export valid directories with a modified name so they don't look like 
bases/deltas.
-      // We could also dump the delta contents all together and rename the 
files if names collide.
-      String mmChildPath = "export_old_" + 
validPath.toString().substring(fromPathStr.length());
-      Path destPath = new Path(exportRootDataDir, mmChildPath);
-      exportFileSystem.mkdirs(destPath);
-      copyOneDataPath(validPath, destPath);
+    for (Path fromPath : dataPathList) {
+      fromPath = dataFileSystem.makeQualified(fromPath);
+      List<Path> validPaths = new ArrayList<>(), dirsWithOriginals = new 
ArrayList<>();
+      HiveInputFormat.processPathsForMmRead(dataPathList,
+          hiveConf, ids, validPaths, dirsWithOriginals);
+      String fromPathStr = fromPath.toString();
+      if (!fromPathStr.endsWith(Path.SEPARATOR)) {
+         fromPathStr += Path.SEPARATOR;
+      }
+      for (Path validPath : validPaths) {
+        // Export valid directories with a modified name so they don't look 
like bases/deltas.
+        // We could also dump the delta contents all together and rename the 
files if names collide.
+        String mmChildPath = "export_old_" + 
validPath.toString().substring(fromPathStr.length());
+        Path destPath = new Path(exportRootDataDir, mmChildPath);
+        Utilities.FILE_OP_LOGGER.debug("Exporting {} to {}", validPath, 
destPath);
+        exportFileSystem.mkdirs(destPath);
+        copyOneDataPath(validPath, destPath);
+      }
+      for (Path dirWithOriginals : dirsWithOriginals) {
+        FileStatus[] files = dataFileSystem.listStatus(dirWithOriginals, 
AcidUtils.hiddenFileFilter);
+        List<Path> srcPaths = new ArrayList<>();
+        for (FileStatus fileStatus : files) {
+          if (fileStatus.isDirectory()) continue;
+          srcPaths.add(fileStatus.getPath());
+        }
+        Utilities.FILE_OP_LOGGER.debug("Exporting originals from {} to {}",
+            dirWithOriginals, exportRootDataDir);
+        new CopyUtils(distCpDoAsUser, hiveConf).doCopy(exportRootDataDir, 
srcPaths);
+      }
     }
   }
 
-  private List<Path> getMmValidPaths(ValidWriteIdList ids, Path fromPath) 
throws IOException {
-    Utilities.FILE_OP_LOGGER.trace("Looking for valid MM paths under {}", 
fromPath);
-    AcidUtils.Directory acidState = AcidUtils.getAcidState(fromPath, hiveConf, 
ids);
-    List<Path> validPaths = new ArrayList<>();
-    Path base = acidState.getBaseDirectory();
-    if (base != null) {
-      validPaths.add(base);
-    }
-    for (ParsedDelta pd : acidState.getCurrentDirectories()) {
-      validPaths.add(pd.getPath());
-    }
-    return validPaths;
-  }
+
 
   /**
    * This needs the root data directory to which the data needs to be exported 
to.
    * The data export here is a list of files either in table/partition that 
are written to the _files
-   * in the exportRootDataDir provided. In case of MM/ACID tables, we expect 
this pathlist to be
-   * already passed as valid paths by caller based on ValidWriteIdList. So, 
mmCtx is ignored here.
+   * in the exportRootDataDir provided.
    */
   private void exportFilesAsList() throws SemanticException, IOException {
+    // This is only called for replication that handles MM tables; no need for 
mmCtx.
     try (BufferedWriter writer = writer()) {
       for (Path dataPath : dataPathList) {
         writeFilesList(listFilesInDir(dataPath), writer, 
AcidUtils.getAcidSubDir(dataPath));

http://git-wip-us.apache.org/repos/asf/hive/blob/b6090fac/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
index 80f77b9..e77fc3e 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
@@ -186,7 +186,7 @@ public class ConditionalResolverMergeFiles implements 
ConditionalResolver,
           }
         }
       } else {
-        Utilities.FILE_OP_LOGGER.info("Resolver returning movetask for " + 
dirPath);
+        Utilities.FILE_OP_LOGGER.info("Resolver returning movetask for " + 
dirPath, new Exception());
         resTsks.add(mvTask);
       }
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/b6090fac/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index b698c84..982b180 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -337,9 +337,11 @@ public class CompactorMR {
     }
 
     int deltaCount = dir.getCurrentDirectories().size();
-    if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) <= 1) {
+    int origCount = dir.getOriginalFiles().size();
+    if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) + origCount <= 
1) {
       LOG.debug("Not compacting " + sd.getLocation() + "; current base is "
-        + dir.getBaseDirectory() + " and there are " + deltaCount + " deltas");
+        + dir.getBaseDirectory() + " and there are " + deltaCount + " deltas 
and "
+        + origCount + " originals");
       return;
     }
     try {
@@ -355,7 +357,8 @@ public class CompactorMR {
 
       // Note: we could skip creating the table and just add table type stuff 
directly to the
       //       "insert overwrite directory" command if there were no bucketing 
or list bucketing.
-      String tmpPrefix = t.getDbName() + ".tmp_compactor_" + t.getTableName() 
+ "_", tmpTableName;
+      String tmpPrefix = t.getDbName() + ".tmp_compactor_" + t.getTableName() 
+ "_";
+      String tmpTableName = null;
       while (true) {
         tmpTableName = tmpPrefix + System.currentTimeMillis();
         String query = buildMmCompactionCtQuery(tmpTableName, t,

http://git-wip-us.apache.org/repos/asf/hive/blob/b6090fac/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 3e2784b..a4d34a7 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -198,7 +198,7 @@ public class TestTxnCommands extends 
TxnCommandsBaseForTests {
         String.format("select a,b from %s order by a,b", importName));
     Assert.assertEquals("After import: " + rs, allData, rs);
     runStatementOnDriver("drop table if exists " + importName);
-    
+
     // Do insert overwrite to create some invalid deltas, and import into a 
non-MM table.
     int[][] rows2 = {{5,6},{7,8}};
     runStatementOnDriver(String.format("insert overwrite table %s %s",
@@ -259,7 +259,7 @@ public class TestTxnCommands extends 
TxnCommandsBaseForTests {
     return paths;
   }
 
-  
+
   /**
    * add tests for all transitions - AC=t, AC=t, AC=f, commit (for example)
    * @throws Exception

http://git-wip-us.apache.org/repos/asf/hive/blob/b6090fac/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index 8a55d8c..7319ba0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -35,6 +35,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.rules.TestName;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -43,6 +44,7 @@ import java.util.List;
 import java.util.Set;
 
 public abstract class TxnCommandsBaseForTests {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TxnCommandsBaseForTests.class);
   //bucket count for test tables; set it to 1 for easier debugging
   final static int BUCKET_COUNT = 2;
   @Rule
@@ -95,6 +97,7 @@ public abstract class TxnCommandsBaseForTests {
         
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
     hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, 
true);
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+    hiveConf.setBoolean("mapred.input.dir.recursive", true);
     TxnDbUtil.setConfValues(hiveConf);
     TxnDbUtil.prepDb(hiveConf);
     File f = new File(getWarehouseDir());
@@ -159,6 +162,7 @@ public abstract class TxnCommandsBaseForTests {
   }
 
   List<String> runStatementOnDriver(String stmt) throws Exception {
+    LOG.info("Running the query: " + stmt);
     CommandProcessorResponse cpr = d.run(stmt);
     if(cpr.getResponseCode() != 0) {
       throw new RuntimeException(stmt + " failed: " + cpr);

http://git-wip-us.apache.org/repos/asf/hive/blob/b6090fac/ql/src/test/queries/clientpositive/mm_conversions.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_conversions.q 
b/ql/src/test/queries/clientpositive/mm_conversions.q
index 14d16ab..ae45c18 100644
--- a/ql/src/test/queries/clientpositive/mm_conversions.q
+++ b/ql/src/test/queries/clientpositive/mm_conversions.q
@@ -14,21 +14,42 @@ insert into table intermediate partition(p='455') select 
distinct key from src w
 insert into table intermediate partition(p='456') select distinct key from src 
where key is not null order by key asc limit 1;
 insert into table intermediate partition(p='457') select distinct key from src 
where key >= 100 order by key asc limit 1;
 
+set hive.mm.allow.originals=true;
+set hive.exim.test.mode=true;
 
 drop table simple_to_mm;
-create table simple_to_mm(key int) stored as orc;
+create table simple_to_mm(key int) stored as orc 
tblproperties("transactional"="false");
 insert into table simple_to_mm select key from intermediate;
 select * from simple_to_mm s1 order by key;
 alter table simple_to_mm set tblproperties("transactional"="true", 
"transactional_properties"="insert_only");
+export table simple_to_mm to 'ql/test/data/exports/export0';
 select * from simple_to_mm s2 order by key;
+create table import_converted0_mm(key int) stored as orc 
tblproperties("transactional"="false");
+import table import_converted0_mm from 'ql/test/data/exports/export0';
+select * from import_converted0_mm order by key;
+drop table import_converted0_mm;
+
 insert into table simple_to_mm select key from intermediate;
 insert into table simple_to_mm select key from intermediate;
+export table simple_to_mm to 'ql/test/data/exports/export1';
 select * from simple_to_mm s3 order by key;
+create table import_converted1_mm(key int) stored as orc 
tblproperties("transactional"="false");
+import table import_converted1_mm from 'ql/test/data/exports/export1';
+select * from import_converted1_mm order by key;
+drop table import_converted1_mm;
+
+insert overwrite table simple_to_mm select key from intermediate;
+export table simple_to_mm to 'ql/test/data/exports/export2';
+select * from simple_to_mm s4 order by key;
+create table import_converted2_mm(key int) stored as orc 
tblproperties("transactional"="false");
+import table import_converted2_mm from 'ql/test/data/exports/export2';
+select * from import_converted2_mm order by key;
+drop table import_converted2_mm;
 drop table simple_to_mm;
 
 
 drop table part_to_mm;
-create table part_to_mm(key int) partitioned by (key_mm int) stored as orc;
+create table part_to_mm(key int) partitioned by (key_mm int) stored as orc 
tblproperties("transactional"="false");
 insert into table part_to_mm partition(key_mm='455') select key from 
intermediate;
 insert into table part_to_mm partition(key_mm='456') select key from 
intermediate;
 select * from part_to_mm s1 order by key, key_mm;
@@ -39,4 +60,19 @@ insert into table part_to_mm partition(key_mm='457') select 
key from intermediat
 select * from part_to_mm s3 order by key, key_mm;
 drop table part_to_mm;
 
+set hive.mm.allow.originals=false;
+
+drop table simple_to_mm_text;
+create table simple_to_mm_text(key int) stored as textfile 
tblproperties("transactional"="false");
+insert into table simple_to_mm_text select key from intermediate;
+select * from simple_to_mm_text t1 order by key;
+alter table simple_to_mm_text set tblproperties("transactional"="true", 
"transactional_properties"="insert_only");
+select * from simple_to_mm_text t2 order by key;
+insert into table simple_to_mm_text select key from intermediate;
+insert into table simple_to_mm_text select key from intermediate;
+select * from simple_to_mm_text t3 order by key;
+insert overwrite table simple_to_mm_text select key from intermediate;
+select * from simple_to_mm_text t4 order by key;
+drop table simple_to_mm_text;
+
 drop table intermediate;

http://git-wip-us.apache.org/repos/asf/hive/blob/b6090fac/ql/src/test/results/clientpositive/llap/mm_conversions.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/mm_conversions.q.out 
b/ql/src/test/results/clientpositive/llap/mm_conversions.q.out
index 4754710..8a9036a 100644
--- a/ql/src/test/results/clientpositive/llap/mm_conversions.q.out
+++ b/ql/src/test/results/clientpositive/llap/mm_conversions.q.out
@@ -41,11 +41,11 @@ PREHOOK: query: drop table simple_to_mm
 PREHOOK: type: DROPTABLE
 POSTHOOK: query: drop table simple_to_mm
 POSTHOOK: type: DROPTABLE
-PREHOOK: query: create table simple_to_mm(key int) stored as orc
+PREHOOK: query: create table simple_to_mm(key int) stored as orc 
tblproperties("transactional"="false")
 PREHOOK: type: CREATETABLE
 PREHOOK: Output: database:default
 PREHOOK: Output: default@simple_to_mm
-POSTHOOK: query: create table simple_to_mm(key int) stored as orc
+POSTHOOK: query: create table simple_to_mm(key int) stored as orc 
tblproperties("transactional"="false")
 POSTHOOK: type: CREATETABLE
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@simple_to_mm
@@ -83,6 +83,14 @@ POSTHOOK: query: alter table simple_to_mm set 
tblproperties("transactional"="tru
 POSTHOOK: type: ALTERTABLE_PROPERTIES
 POSTHOOK: Input: default@simple_to_mm
 POSTHOOK: Output: default@simple_to_mm
+PREHOOK: query: export table simple_to_mm to 'ql/test/data/exports/export0'
+PREHOOK: type: EXPORT
+PREHOOK: Input: default@simple_to_mm
+#### A masked pattern was here ####
+POSTHOOK: query: export table simple_to_mm to 'ql/test/data/exports/export0'
+POSTHOOK: type: EXPORT
+POSTHOOK: Input: default@simple_to_mm
+#### A masked pattern was here ####
 PREHOOK: query: select * from simple_to_mm s2 order by key
 PREHOOK: type: QUERY
 PREHOOK: Input: default@simple_to_mm
@@ -94,6 +102,41 @@ POSTHOOK: Input: default@simple_to_mm
 0
 98
 100
+PREHOOK: query: create table import_converted0_mm(key int) stored as orc 
tblproperties("transactional"="false")
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@import_converted0_mm
+POSTHOOK: query: create table import_converted0_mm(key int) stored as orc 
tblproperties("transactional"="false")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@import_converted0_mm
+PREHOOK: query: import table import_converted0_mm from 
'ql/test/data/exports/export0'
+PREHOOK: type: IMPORT
+#### A masked pattern was here ####
+PREHOOK: Output: default@import_converted0_mm
+POSTHOOK: query: import table import_converted0_mm from 
'ql/test/data/exports/export0'
+POSTHOOK: type: IMPORT
+#### A masked pattern was here ####
+POSTHOOK: Output: default@import_converted0_mm
+PREHOOK: query: select * from import_converted0_mm order by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@import_converted0_mm
+#### A masked pattern was here ####
+POSTHOOK: query: select * from import_converted0_mm order by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@import_converted0_mm
+#### A masked pattern was here ####
+0
+98
+100
+PREHOOK: query: drop table import_converted0_mm
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@import_converted0_mm
+PREHOOK: Output: default@import_converted0_mm
+POSTHOOK: query: drop table import_converted0_mm
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@import_converted0_mm
+POSTHOOK: Output: default@import_converted0_mm
 PREHOOK: query: insert into table simple_to_mm select key from intermediate
 PREHOOK: type: QUERY
 PREHOOK: Input: default@intermediate
@@ -124,6 +167,14 @@ POSTHOOK: Input: default@intermediate@p=456
 POSTHOOK: Input: default@intermediate@p=457
 POSTHOOK: Output: default@simple_to_mm
 POSTHOOK: Lineage: simple_to_mm.key SIMPLE 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: export table simple_to_mm to 'ql/test/data/exports/export1'
+PREHOOK: type: EXPORT
+PREHOOK: Input: default@simple_to_mm
+#### A masked pattern was here ####
+POSTHOOK: query: export table simple_to_mm to 'ql/test/data/exports/export1'
+POSTHOOK: type: EXPORT
+POSTHOOK: Input: default@simple_to_mm
+#### A masked pattern was here ####
 PREHOOK: query: select * from simple_to_mm s3 order by key
 PREHOOK: type: QUERY
 PREHOOK: Input: default@simple_to_mm
@@ -141,6 +192,116 @@ POSTHOOK: Input: default@simple_to_mm
 100
 100
 100
+PREHOOK: query: create table import_converted1_mm(key int) stored as orc 
tblproperties("transactional"="false")
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@import_converted1_mm
+POSTHOOK: query: create table import_converted1_mm(key int) stored as orc 
tblproperties("transactional"="false")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@import_converted1_mm
+PREHOOK: query: import table import_converted1_mm from 
'ql/test/data/exports/export1'
+PREHOOK: type: IMPORT
+#### A masked pattern was here ####
+PREHOOK: Output: default@import_converted1_mm
+POSTHOOK: query: import table import_converted1_mm from 
'ql/test/data/exports/export1'
+POSTHOOK: type: IMPORT
+#### A masked pattern was here ####
+POSTHOOK: Output: default@import_converted1_mm
+PREHOOK: query: select * from import_converted1_mm order by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@import_converted1_mm
+#### A masked pattern was here ####
+POSTHOOK: query: select * from import_converted1_mm order by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@import_converted1_mm
+#### A masked pattern was here ####
+0
+0
+0
+98
+98
+98
+100
+100
+100
+PREHOOK: query: drop table import_converted1_mm
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@import_converted1_mm
+PREHOOK: Output: default@import_converted1_mm
+POSTHOOK: query: drop table import_converted1_mm
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@import_converted1_mm
+POSTHOOK: Output: default@import_converted1_mm
+PREHOOK: query: insert overwrite table simple_to_mm select key from 
intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Input: default@intermediate@p=457
+PREHOOK: Output: default@simple_to_mm
+POSTHOOK: query: insert overwrite table simple_to_mm select key from 
intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Input: default@intermediate@p=457
+POSTHOOK: Output: default@simple_to_mm
+POSTHOOK: Lineage: simple_to_mm.key SIMPLE 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: export table simple_to_mm to 'ql/test/data/exports/export2'
+PREHOOK: type: EXPORT
+PREHOOK: Input: default@simple_to_mm
+#### A masked pattern was here ####
+POSTHOOK: query: export table simple_to_mm to 'ql/test/data/exports/export2'
+POSTHOOK: type: EXPORT
+POSTHOOK: Input: default@simple_to_mm
+#### A masked pattern was here ####
+PREHOOK: query: select * from simple_to_mm s4 order by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@simple_to_mm
+#### A masked pattern was here ####
+POSTHOOK: query: select * from simple_to_mm s4 order by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@simple_to_mm
+#### A masked pattern was here ####
+0
+98
+100
+PREHOOK: query: create table import_converted2_mm(key int) stored as orc 
tblproperties("transactional"="false")
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@import_converted2_mm
+POSTHOOK: query: create table import_converted2_mm(key int) stored as orc 
tblproperties("transactional"="false")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@import_converted2_mm
+PREHOOK: query: import table import_converted2_mm from 
'ql/test/data/exports/export2'
+PREHOOK: type: IMPORT
+#### A masked pattern was here ####
+PREHOOK: Output: default@import_converted2_mm
+POSTHOOK: query: import table import_converted2_mm from 
'ql/test/data/exports/export2'
+POSTHOOK: type: IMPORT
+#### A masked pattern was here ####
+POSTHOOK: Output: default@import_converted2_mm
+PREHOOK: query: select * from import_converted2_mm order by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@import_converted2_mm
+#### A masked pattern was here ####
+POSTHOOK: query: select * from import_converted2_mm order by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@import_converted2_mm
+#### A masked pattern was here ####
+0
+98
+100
+PREHOOK: query: drop table import_converted2_mm
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@import_converted2_mm
+PREHOOK: Output: default@import_converted2_mm
+POSTHOOK: query: drop table import_converted2_mm
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@import_converted2_mm
+POSTHOOK: Output: default@import_converted2_mm
 PREHOOK: query: drop table simple_to_mm
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@simple_to_mm
@@ -153,11 +314,11 @@ PREHOOK: query: drop table part_to_mm
 PREHOOK: type: DROPTABLE
 POSTHOOK: query: drop table part_to_mm
 POSTHOOK: type: DROPTABLE
-PREHOOK: query: create table part_to_mm(key int) partitioned by (key_mm int) 
stored as orc
+PREHOOK: query: create table part_to_mm(key int) partitioned by (key_mm int) 
stored as orc tblproperties("transactional"="false")
 PREHOOK: type: CREATETABLE
 PREHOOK: Output: database:default
 PREHOOK: Output: default@part_to_mm
-POSTHOOK: query: create table part_to_mm(key int) partitioned by (key_mm int) 
stored as orc
+POSTHOOK: query: create table part_to_mm(key int) partitioned by (key_mm int) 
stored as orc tblproperties("transactional"="false")
 POSTHOOK: type: CREATETABLE
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@part_to_mm
@@ -299,6 +460,144 @@ POSTHOOK: query: drop table part_to_mm
 POSTHOOK: type: DROPTABLE
 POSTHOOK: Input: default@part_to_mm
 POSTHOOK: Output: default@part_to_mm
+PREHOOK: query: drop table simple_to_mm_text
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table simple_to_mm_text
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table simple_to_mm_text(key int) stored as textfile 
tblproperties("transactional"="false")
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@simple_to_mm_text
+POSTHOOK: query: create table simple_to_mm_text(key int) stored as textfile 
tblproperties("transactional"="false")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@simple_to_mm_text
+PREHOOK: query: insert into table simple_to_mm_text select key from 
intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Input: default@intermediate@p=457
+PREHOOK: Output: default@simple_to_mm_text
+POSTHOOK: query: insert into table simple_to_mm_text select key from 
intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Input: default@intermediate@p=457
+POSTHOOK: Output: default@simple_to_mm_text
+POSTHOOK: Lineage: simple_to_mm_text.key SIMPLE 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: select * from simple_to_mm_text t1 order by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@simple_to_mm_text
+#### A masked pattern was here ####
+POSTHOOK: query: select * from simple_to_mm_text t1 order by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@simple_to_mm_text
+#### A masked pattern was here ####
+0
+98
+100
+PREHOOK: query: alter table simple_to_mm_text set 
tblproperties("transactional"="true", "transactional_properties"="insert_only")
+PREHOOK: type: ALTERTABLE_PROPERTIES
+PREHOOK: Input: default@simple_to_mm_text
+PREHOOK: Output: default@simple_to_mm_text
+POSTHOOK: query: alter table simple_to_mm_text set 
tblproperties("transactional"="true", "transactional_properties"="insert_only")
+POSTHOOK: type: ALTERTABLE_PROPERTIES
+POSTHOOK: Input: default@simple_to_mm_text
+POSTHOOK: Output: default@simple_to_mm_text
+PREHOOK: query: select * from simple_to_mm_text t2 order by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@simple_to_mm_text
+#### A masked pattern was here ####
+POSTHOOK: query: select * from simple_to_mm_text t2 order by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@simple_to_mm_text
+#### A masked pattern was here ####
+0
+98
+100
+PREHOOK: query: insert into table simple_to_mm_text select key from 
intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Input: default@intermediate@p=457
+PREHOOK: Output: default@simple_to_mm_text
+POSTHOOK: query: insert into table simple_to_mm_text select key from 
intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Input: default@intermediate@p=457
+POSTHOOK: Output: default@simple_to_mm_text
+POSTHOOK: Lineage: simple_to_mm_text.key SIMPLE 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: insert into table simple_to_mm_text select key from 
intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Input: default@intermediate@p=457
+PREHOOK: Output: default@simple_to_mm_text
+POSTHOOK: query: insert into table simple_to_mm_text select key from 
intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Input: default@intermediate@p=457
+POSTHOOK: Output: default@simple_to_mm_text
+POSTHOOK: Lineage: simple_to_mm_text.key SIMPLE 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: select * from simple_to_mm_text t3 order by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@simple_to_mm_text
+#### A masked pattern was here ####
+POSTHOOK: query: select * from simple_to_mm_text t3 order by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@simple_to_mm_text
+#### A masked pattern was here ####
+0
+0
+0
+98
+98
+98
+100
+100
+100
+PREHOOK: query: insert overwrite table simple_to_mm_text select key from 
intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Input: default@intermediate@p=457
+PREHOOK: Output: default@simple_to_mm_text
+POSTHOOK: query: insert overwrite table simple_to_mm_text select key from 
intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Input: default@intermediate@p=457
+POSTHOOK: Output: default@simple_to_mm_text
+POSTHOOK: Lineage: simple_to_mm_text.key SIMPLE 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: select * from simple_to_mm_text t4 order by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@simple_to_mm_text
+#### A masked pattern was here ####
+POSTHOOK: query: select * from simple_to_mm_text t4 order by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@simple_to_mm_text
+#### A masked pattern was here ####
+0
+98
+100
+PREHOOK: query: drop table simple_to_mm_text
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@simple_to_mm_text
+PREHOOK: Output: default@simple_to_mm_text
+POSTHOOK: query: drop table simple_to_mm_text
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@simple_to_mm_text
+POSTHOOK: Output: default@simple_to_mm_text
 PREHOOK: query: drop table intermediate
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@intermediate

Reply via email to