HIVE-17657 : export/import for MM tables is broken (Sergey Shelukhin, reviewed 
by Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cbe8e619
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cbe8e619
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cbe8e619

Branch: refs/heads/branch-3
Commit: cbe8e61980729d8c5d6c86987c6026dc61f394c7
Parents: 33b283c
Author: sergey <[email protected]>
Authored: Mon May 7 13:20:51 2018 -0700
Committer: sergey <[email protected]>
Committed: Tue Jun 5 12:17:16 2018 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/CopyTask.java    |  49 ++------
 .../apache/hadoop/hive/ql/exec/ExportTask.java  |  10 +-
 .../apache/hadoop/hive/ql/exec/Utilities.java   |   3 +
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java  |   6 +-
 .../hive/ql/parse/ExportSemanticAnalyzer.java   |  45 ++++---
 .../hive/ql/parse/ImportSemanticAnalyzer.java   |  34 ++----
 .../hive/ql/parse/SemanticAnalyzerFactory.java  |   2 +-
 .../ql/parse/repl/dump/PartitionExport.java     |   9 +-
 .../hive/ql/parse/repl/dump/TableExport.java    |  11 +-
 .../ql/parse/repl/dump/io/FileOperations.java   |  84 +++++++++++--
 .../apache/hadoop/hive/ql/plan/CopyWork.java    |  18 ---
 .../apache/hadoop/hive/ql/plan/ExportWork.java  |  60 +++++----
 .../apache/hadoop/hive/ql/TestTxnCommands.java  | 122 +++++++++++++++++--
 .../org/apache/hadoop/hive/ql/TestTxnExIm.java  |   9 +-
 .../hadoop/hive/ql/TxnCommandsBaseForTests.java |   3 +-
 ql/src/test/queries/clientpositive/mm_exim.q    |   4 +-
 .../results/clientpositive/llap/mm_exim.q.out   |  66 +++++++++-
 17 files changed, 382 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
index b0ec5ab..1a8e5e7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
@@ -63,14 +63,25 @@ public class CopyTask extends Task<CopyWork> implements 
Serializable {
   protected int copyOnePath(Path fromPath, Path toPath) {
     FileSystem dstFs = null;
     try {
-      Utilities.FILE_OP_LOGGER.trace("Copying data from {} to {} " + fromPath);
+      Utilities.FILE_OP_LOGGER.trace("Copying data from {} to {} ", fromPath, 
toPath);
       console.printInfo("Copying data from " + fromPath.toString(), " to "
           + toPath.toString());
 
       FileSystem srcFs = fromPath.getFileSystem(conf);
       dstFs = toPath.getFileSystem(conf);
 
-      FileStatus[] srcs = matchFilesOrDir(srcFs, fromPath, 
work.doSkipSourceMmDirs());
+      FileStatus[] srcs = srcFs.globStatus(fromPath, new EximPathFilter());
+
+      // TODO: this is very brittle given that Hive supports nested 
directories in the tables.
+      //       The caller should pass a flag explicitly telling us if the 
directories in the
+      //       input are data, or parent of data. For now, retain this for 
backward compat.
+      if (srcs != null && srcs.length == 1 && srcs[0].isDirectory()
+          /*&& srcs[0].getPath().getName().equals(EximUtil.DATA_PATH_NAME) -  
still broken for partitions*/) {
+        Utilities.FILE_OP_LOGGER.debug(
+            "Recursing into a single child directory {}", 
srcs[0].getPath().getName());
+        srcs = srcFs.listStatus(srcs[0].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
+      }
+
       if (srcs == null || srcs.length == 0) {
         if (work.isErrorOnSrcEmpty()) {
           console.printError("No files matching path: " + fromPath.toString());
@@ -107,40 +118,6 @@ public class CopyTask extends Task<CopyWork> implements 
Serializable {
     }
   }
 
-  // Note: initially copied from LoadSemanticAnalyzer.
-  private static FileStatus[] matchFilesOrDir(
-      FileSystem fs, Path path, boolean isSourceMm) throws IOException {
-    if (!fs.exists(path)) return null;
-    if (!isSourceMm) return matchFilesOneDir(fs, path, null);
-    // Note: this doesn't handle list bucketing properly; neither does the 
original code.
-    FileStatus[] mmDirs = fs.listStatus(path, new AcidUtils.AnyIdDirFilter());
-    if (mmDirs == null || mmDirs.length == 0) return null;
-    List<FileStatus> allFiles = new ArrayList<FileStatus>();
-    for (FileStatus mmDir : mmDirs) {
-      if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
-        Utilities.FILE_OP_LOGGER.trace("Found source MM directory " + 
mmDir.getPath());
-      }
-      matchFilesOneDir(fs, mmDir.getPath(), allFiles);
-    }
-    return allFiles.toArray(new FileStatus[allFiles.size()]);
-  }
-
-  private static FileStatus[] matchFilesOneDir(
-      FileSystem fs, Path path, List<FileStatus> result) throws IOException {
-    FileStatus[] srcs = fs.globStatus(path, new EximPathFilter());
-    if (srcs != null && srcs.length == 1) {
-      if (srcs[0].isDirectory()) {
-        srcs = fs.listStatus(srcs[0].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
-      }
-    }
-    if (result != null && srcs != null) {
-      for (int i = 0; i < srcs.length; ++i) {
-        result.add(srcs[i]);
-      }
-    }
-    return srcs;
-  }
-
   private static final class EximPathFilter implements PathFilter {
     @Override
     public boolean accept(Path p) {

http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
index e3af4f9..3c6a606 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
@@ -47,15 +47,13 @@ public class ExportTask extends Task<ExportWork> implements 
Serializable {
   protected int execute(DriverContext driverContext) {
     try {
       // Also creates the root directory
-      TableExport.Paths exportPaths =
-          new TableExport.Paths(work.getAstRepresentationForErrorMsg(), 
work.getExportRootDir(),
-              conf, false);
+      TableExport.Paths exportPaths = new TableExport.Paths(
+          work.getAstRepresentationForErrorMsg(), work.getExportRootDir(), 
conf, false);
       Hive db = getHive();
       LOG.debug("Exporting data to: {}", exportPaths.exportRootDir());
       work.acidPostProcess(db);
-      TableExport tableExport = new TableExport(
-          exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, 
null, conf
-      );
+      TableExport tableExport = new TableExport(exportPaths, 
work.getTableSpec(),
+          work.getReplicationSpec(), db, null, conf, work.getMmContext());
       if (!tableExport.write()) {
         throw new SemanticException(ErrorMsg.EXIM_FOR_NON_NATIVE.getMsg());
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 31846a3..406bea0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+
 import java.beans.DefaultPersistenceDelegate;
 import java.beans.Encoder;
 import java.beans.Expression;
@@ -72,6 +73,7 @@ import java.util.regex.Pattern;
 import java.util.zip.Deflater;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.InflaterInputStream;
+
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import org.apache.commons.codec.binary.Base64;
@@ -136,6 +138,7 @@ import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
 import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
+import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
 import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper;

http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 88d352b..ccdf04a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -64,6 +64,7 @@ import 
org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
 import org.apache.hadoop.hive.ql.parse.repl.dump.log.BootstrapDumpLogger;
 import org.apache.hadoop.hive.ql.parse.repl.dump.log.IncrementalDumpLogger;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -288,7 +289,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
       if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) {
         tuple.replicationSpec.setValidWriteIdList(getValidWriteIdList(dbName, 
tblName, validTxnList));
       }
-      new TableExport(exportPaths, tableSpec, tuple.replicationSpec, db, 
distCpDoAsUser, conf).write();
+      MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle);
+      new TableExport(
+          exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, 
conf, mmCtx).write();
+
 
       replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType());
     } catch (InvalidTableException te) {

http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
index d3c62a2..4a366a9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
@@ -19,12 +19,17 @@
 package org.apache.hadoop.hive.ql.parse;
 
 
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
 import org.antlr.runtime.tree.Tree;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -32,15 +37,14 @@ import 
org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport;
 import org.apache.hadoop.hive.ql.plan.ExportWork;
-
-import javax.annotation.Nullable;
-import java.util.Set;
+import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
 
 /**
  * ExportSemanticAnalyzer.
  *
  */
 public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
+  private boolean isMmExport = false;
 
   ExportSemanticAnalyzer(QueryState queryState) throws SemanticException {
     super(queryState);
@@ -48,7 +52,9 @@ public class ExportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
   @Override
   public void analyzeInternal(ASTNode ast) throws SemanticException {
-    rootTasks.add(analyzeExport(ast, null, db, conf, inputs, outputs));
+    Task<ExportWork> task = analyzeExport(ast, null, db, conf, inputs, 
outputs);
+    isMmExport = task.getWork().getMmContext() != null;
+    rootTasks.add(task);
   }
   /**
    * @param acidTableName - table name in db.table format; not NULL if 
exporting Acid table
@@ -80,12 +86,10 @@ public class ExportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
     try {
       ts = new TableSpec(db, conf, (ASTNode) tableTree, false, true);
-    } catch (SemanticException sme){
-      if ((replicationSpec.isInReplicationScope()) &&
-            ((sme.getCause() instanceof InvalidTableException)
-            || (sme instanceof Table.ValidationFailureSemanticException)
-            )
-          ){
+    } catch (SemanticException sme) {
+      if (!replicationSpec.isInReplicationScope()) throw sme;
+      if ((sme.getCause() instanceof InvalidTableException)
+            || (sme instanceof Table.ValidationFailureSemanticException)) {
         // If we're in replication scope, it's possible that we're running the 
export long after
         // the table was dropped, so the table not existing currently or being 
a different kind of
         // table is not an error - it simply means we should no-op, and let a 
future export
@@ -101,15 +105,26 @@ public class ExportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     // All parsing is done, we're now good to start the export process
     TableExport.Paths exportPaths =
         new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, 
conf, false);
-    TableExport tableExport = new TableExport(exportPaths, ts, 
replicationSpec, db, null, conf);
-    TableExport.AuthEntities authEntities = tableExport.getAuthEntities();
+    // Note: this tableExport is actually never used other than for auth, and 
another one is
+    //       created when the task is executed. So, we don't care about the 
correct MM state here.
+    TableExport.AuthEntities authEntities = new TableExport(
+        exportPaths, ts, replicationSpec, db, null, conf, 
null).getAuthEntities();
     inputs.addAll(authEntities.inputs);
     outputs.addAll(authEntities.outputs);
     String exportRootDirName = tmpPath;
+    MmContext mmCtx = MmContext.createIfNeeded(ts == null ? null : 
ts.tableHandle);
+
+    Utilities.FILE_OP_LOGGER.debug("Exporting table {}: MM context {}",
+        ts == null ? null : ts.tableName, mmCtx);
     // Configure export work
-    ExportWork exportWork =
-        new ExportWork(exportRootDirName, ts, replicationSpec, 
ErrorMsg.INVALID_PATH.getMsg(ast), acidTableName);
+    ExportWork exportWork = new ExportWork(exportRootDirName, ts, 
replicationSpec,
+        ErrorMsg.INVALID_PATH.getMsg(ast), acidTableName, mmCtx);
     // Create an export task and add it as a root task
-    return  TaskFactory.get(exportWork);
+    return TaskFactory.get(exportWork);
+  }
+  
+  @Override
+  public boolean hasTransactionalInQuery() {
+    return isMmExport; // Full ACID export goes thru UpdateDelete analyzer.
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 1a3cef9..cc7f0d5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -384,7 +384,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
   private static Task<?> loadTable(URI fromURI, Table table, boolean replace, 
Path tgtPath,
       ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext 
x,
-      Long writeId, int stmtId, boolean isSourceMm) {
+      Long writeId, int stmtId) {
     assert table != null;
     assert table.getParameters() != null;
     Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME);
@@ -425,9 +425,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     if (replicationSpec.isInReplicationScope()) {
       copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, 
destPath, x.getConf());
     } else {
-      CopyWork cw = new CopyWork(dataPath, destPath, false);
-      cw.setSkipSourceMmDirs(isSourceMm);
-      copyTask = TaskFactory.get(cw);
+      copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false));
     }
 
     LoadTableDesc loadTableWork = new LoadTableDesc(
@@ -482,7 +480,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
  private static Task<?> addSinglePartition(URI fromURI, FileSystem fs, 
ImportTableDesc tblDesc,
       Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, 
ReplicationSpec replicationSpec,
-      EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, 
boolean isSourceMm)
+      EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId)
       throws MetaException, IOException, HiveException {
     AddPartitionDesc.OnePartitionDesc partSpec = 
addPartitionDesc.getPartition(0);
     if (tblDesc.isExternal() && tblDesc.getLocation() == null) {
@@ -519,9 +517,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         copyTask = ReplCopyTask.getLoadCopyTask(
             replicationSpec, new Path(srcLocation), destPath, x.getConf());
       } else {
-        CopyWork cw = new CopyWork(new Path(srcLocation), destPath, false);
-        cw.setSkipSourceMmDirs(isSourceMm);
-        copyTask = TaskFactory.get(cw);
+        copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), 
destPath, false));
       }
 
       Task<?> addPartTask = TaskFactory.get(
@@ -832,8 +828,6 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId)
       throws HiveException, IOException, MetaException {
 
-    final boolean isSourceMm = 
AcidUtils.isInsertOnlyTable(tblDesc.getTblProps());
-
     if (table != null) {
       if (table.isPartitioned()) {
         x.getLOG().debug("table partitioned");
@@ -843,7 +837,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
           org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
           if ((ptn = x.getHive().getPartition(table, partSpec, false)) == 
null) {
             x.getTasks().add(addSinglePartition(
-                fromURI, fs, tblDesc, table, wh, addPartitionDesc, 
replicationSpec, x, writeId, stmtId, isSourceMm));
+                fromURI, fs, tblDesc, table, wh, addPartitionDesc, 
replicationSpec, x, writeId, stmtId));
           } else {
             throw new SemanticException(
                 ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec)));
@@ -855,8 +849,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         Path tgtPath = new Path(table.getDataLocation().toString());
         FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf());
         checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x.getLOG());
-        loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, 
stmtId,
-            isSourceMm);
+        loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, 
stmtId);
       }
       // Set this to read because we can't overwrite any existing partitions
       x.getOutputs().add(new WriteEntity(table, 
WriteEntity.WriteType.DDL_NO_LOCK));
@@ -875,7 +868,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       if (isPartitioned(tblDesc)) {
         for (AddPartitionDesc addPartitionDesc : partitionDescs) {
           t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, 
wh, addPartitionDesc,
-            replicationSpec, x, writeId, stmtId, isSourceMm));
+            replicationSpec, x, writeId, stmtId));
         }
       } else {
         x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
@@ -893,7 +886,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
           FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf());
           checkTargetLocationEmpty(tgtFs, tablePath, 
replicationSpec,x.getLOG());
           t.addDependentTask(loadTable(fromURI, table, false, tablePath, 
replicationSpec, x,
-              writeId, stmtId, isSourceMm));
+              writeId, stmtId));
         }
       }
       x.getTasks().add(t);
@@ -930,7 +923,6 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       throws HiveException, URISyntaxException, IOException, MetaException {
 
     Task<?> dropTblTask = null;
-    final boolean isSourceMm = 
AcidUtils.isInsertOnlyTable(tblDesc.getTblProps());
     WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK;
 
     // Normally, on import, trying to create a table or a partition in a db 
that does not yet exist
@@ -1014,14 +1006,14 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
           for (AddPartitionDesc addPartitionDesc : partitionDescs) {
             addPartitionDesc.setReplicationSpec(replicationSpec);
             t.addDependentTask(
-                addSinglePartition(fromURI, fs, tblDesc, table, wh, 
addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm));
+                addSinglePartition(fromURI, fs, tblDesc, table, wh, 
addPartitionDesc, replicationSpec, x, writeId, stmtId));
             if (updatedMetadata != null) {
               
updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
             }
           }
         } else {
           x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
-          t.addDependentTask(loadTable(fromURI, table, true, new 
Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId, isSourceMm));
+          t.addDependentTask(loadTable(fromURI, table, true, new 
Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId));
         }
       }
 
@@ -1066,7 +1058,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
           if (ptn == null) {
             if (!replicationSpec.isMetadataOnly()){
               x.getTasks().add(addSinglePartition(
-                  fromURI, fs, tblDesc, table, wh, addPartitionDesc, 
replicationSpec, x, writeId, stmtId, isSourceMm));
+                  fromURI, fs, tblDesc, table, wh, addPartitionDesc, 
replicationSpec, x, writeId, stmtId));
               if (updatedMetadata != null) {
                 
updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
               }
@@ -1083,7 +1075,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
             if (replicationSpec.allowReplacementInto(ptn.getParameters())){
               if (!replicationSpec.isMetadataOnly()){
                 x.getTasks().add(addSinglePartition(
-                    fromURI, fs, tblDesc, table, wh, addPartitionDesc, 
replicationSpec, x, writeId, stmtId, isSourceMm));
+                    fromURI, fs, tblDesc, table, wh, addPartitionDesc, 
replicationSpec, x, writeId, stmtId));
               } else {
                 x.getTasks().add(alterSinglePartition(
                     fromURI, fs, tblDesc, table, wh, addPartitionDesc, 
replicationSpec, ptn, x));
@@ -1109,7 +1101,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         if (!replicationSpec.isMetadataOnly()) {
           // repl-imports are replace-into unless the event is insert-into
           loadTable(fromURI, table, replicationSpec.isReplace(), new 
Path(tblDesc.getLocation()),
-            replicationSpec, x, writeId, stmtId, isSourceMm);
+            replicationSpec, x, writeId, stmtId);
         } else {
           x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec));
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
index 0f42ab8..088b5cf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
@@ -216,7 +216,7 @@ public final class SemanticAnalyzerFactory {
       case HiveParser.TOK_LOAD:
         return new LoadSemanticAnalyzer(queryState);
       case HiveParser.TOK_EXPORT:
-        if(UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) {
+        if (UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) {
           return new UpdateDeleteSemanticAnalyzer(queryState);
         }
         return new ExportSemanticAnalyzer(queryState);

http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
index 70eb750..d73fc4f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations;
+import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,6 +44,8 @@ import static 
org.apache.hadoop.hive.ql.parse.repl.dump.TableExport.Paths;
  * it has a blocking queue that stores partitions to be dumped via a producer 
thread.
  * it has a worker thread pool that reads of the queue to perform the various 
tasks.
  */
+// TODO: this object is created once to call one method and then immediately 
destroyed.
+//       So it's basically just a roundabout way to pass arguments to a static 
method. Simplify?
 class PartitionExport {
   private final Paths paths;
   private final PartitionIterable partitionIterable;
@@ -50,16 +53,18 @@ class PartitionExport {
   private final HiveConf hiveConf;
   private final int nThreads;
   private final SessionState callersSession;
+  private final MmContext mmCtx;
 
   private static final Logger LOG = 
LoggerFactory.getLogger(PartitionExport.class);
   private BlockingQueue<Partition> queue;
 
   PartitionExport(Paths paths, PartitionIterable partitionIterable, String 
distCpDoAsUser,
-      HiveConf hiveConf) {
+      HiveConf hiveConf, MmContext mmCtx) {
     this.paths = paths;
     this.partitionIterable = partitionIterable;
     this.distCpDoAsUser = distCpDoAsUser;
     this.hiveConf = hiveConf;
+    this.mmCtx = mmCtx;
     this.nThreads = 
hiveConf.getIntVar(HiveConf.ConfVars.REPL_PARTITIONS_DUMP_PARALLELISM);
     this.queue = new ArrayBlockingQueue<>(2 * nThreads);
     this.callersSession = SessionState.get();
@@ -106,7 +111,7 @@ class PartitionExport {
           List<Path> dataPathList = 
Utils.getDataPathList(partition.getDataLocation(),
                   forReplicationSpec, hiveConf);
           Path rootDataDumpDir = paths.partitionExportDir(partitionName);
-          new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, 
hiveConf)
+          new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, 
hiveConf, mmCtx)
                   .export(forReplicationSpec);
           LOG.debug("Thread: {}, finish partition dump {}", threadName, 
partitionName);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index e801a64..20ff23a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations;
+import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,6 +51,8 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.toWriteEntity;
 
+// TODO: this object is created once to call one method and then immediately 
destroyed.
+//       So it's basically just a roundabout way to pass arguments to a static 
method. Simplify?
 public class TableExport {
   private static final Logger logger = 
LoggerFactory.getLogger(TableExport.class);
 
@@ -59,9 +62,10 @@ public class TableExport {
   private final String distCpDoAsUser;
   private final HiveConf conf;
   private final Paths paths;
+  private final MmContext mmCtx;
 
   public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec 
replicationSpec, Hive db,
-      String distCpDoAsUser, HiveConf conf) {
+      String distCpDoAsUser, HiveConf conf, MmContext mmCtx) {
     this.tableSpec = (tableSpec != null
         && tableSpec.tableHandle.isTemporary()
         && replicationSpec.isInReplicationScope())
@@ -76,6 +80,7 @@ public class TableExport {
     this.distCpDoAsUser = distCpDoAsUser;
     this.conf = conf;
     this.paths = paths;
+    this.mmCtx = mmCtx;
   }
 
   public boolean write() throws SemanticException {
@@ -147,13 +152,13 @@ public class TableExport {
           throw new IllegalStateException("partitions cannot be null for 
partitionTable :"
               + tableSpec.tableName);
         }
-        new PartitionExport(paths, partitions, distCpDoAsUser, 
conf).write(replicationSpec);
+        new PartitionExport(paths, partitions, distCpDoAsUser, conf, 
mmCtx).write(replicationSpec);
       } else {
         List<Path> dataPathList = 
Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(),
                 replicationSpec, conf);
 
         // this is the data copy
-        new FileOperations(dataPathList, paths.dataExportDir(), 
distCpDoAsUser, conf)
+        new FileOperations(dataPathList, paths.dataExportDir(), 
distCpDoAsUser, conf, mmCtx)
             .export(replicationSpec);
       }
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
index 690498f..b61a945 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
@@ -20,24 +20,31 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.io;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
+import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.security.auth.login.LoginException;
+
 import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.util.ArrayList;
 import java.util.List;
 
+//TODO: this object is created once to call one method and then immediately 
destroyed.
+//So it's basically just a roundabout way to pass arguments to a static 
method. Simplify?
 public class FileOperations {
   private static Logger logger = LoggerFactory.getLogger(FileOperations.class);
   private final List<Path> dataPathList;
@@ -45,13 +52,15 @@ public class FileOperations {
   private final String distCpDoAsUser;
   private HiveConf hiveConf;
   private final FileSystem dataFileSystem, exportFileSystem;
+  private final MmContext mmCtx;
 
-  public FileOperations(List<Path> dataPathList, Path exportRootDataDir,
-                        String distCpDoAsUser, HiveConf hiveConf) throws 
IOException {
+  public FileOperations(List<Path> dataPathList, Path exportRootDataDir, 
String distCpDoAsUser,
+      HiveConf hiveConf, MmContext mmCtx) throws IOException {
     this.dataPathList = dataPathList;
     this.exportRootDataDir = exportRootDataDir;
     this.distCpDoAsUser = distCpDoAsUser;
     this.hiveConf = hiveConf;
+    this.mmCtx = mmCtx;
     if ((dataPathList != null) && !dataPathList.isEmpty()) {
       dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf);
     } else {
@@ -72,17 +81,59 @@ public class FileOperations {
    * This writes the actual data in the exportRootDataDir from the source.
    */
   private void copyFiles() throws IOException, LoginException {
-    for (Path dataPath : dataPathList) {
-      FileStatus[] fileStatuses =
-              LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, dataPath);
-      List<Path> srcPaths = new ArrayList<>();
-      for (FileStatus fileStatus : fileStatuses) {
-        srcPaths.add(fileStatus.getPath());
+    if (mmCtx == null) {
+      for (Path dataPath : dataPathList) {
+        copyOneDataPath(dataPath, exportRootDataDir);
       }
-      new CopyUtils(distCpDoAsUser, hiveConf).doCopy(exportRootDataDir, 
srcPaths);
+    } else {
+      copyMmPath();
+    }
+  }
+
+  private void copyOneDataPath(Path fromPath, Path toPath) throws IOException, 
LoginException {
+    FileStatus[] fileStatuses = 
LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, fromPath);
+    List<Path> srcPaths = new ArrayList<>();
+    for (FileStatus fileStatus : fileStatuses) {
+      srcPaths.add(fileStatus.getPath());
     }
+
+    new CopyUtils(distCpDoAsUser, hiveConf).doCopy(toPath, srcPaths);
   }
 
+  private void copyMmPath() throws LoginException, IOException {
+    assert dataPathList.size() == 1;
+    ValidWriteIdList ids = AcidUtils.getTableValidWriteIdList(hiveConf, 
mmCtx.getFqTableName());
+    Path fromPath = dataFileSystem.makeQualified(dataPathList.get(0));
+    List<Path> validPaths = getMmValidPaths(ids, fromPath);
+    String fromPathStr = fromPath.toString();
+    if (!fromPathStr.endsWith(Path.SEPARATOR)) {
+       fromPathStr += Path.SEPARATOR;
+    }
+    for (Path validPath : validPaths) {
+      // Export valid directories with a modified name so they don't look like 
bases/deltas.
+      // We could also dump the delta contents all together and rename the 
files if names collide.
+      String mmChildPath = "export_old_" + 
validPath.toString().substring(fromPathStr.length());
+      Path destPath = new Path(exportRootDataDir, mmChildPath);
+      exportFileSystem.mkdirs(destPath);
+      copyOneDataPath(validPath, destPath);
+    }
+  }
+
+  private List<Path> getMmValidPaths(ValidWriteIdList ids, Path fromPath) 
throws IOException {
+    Utilities.FILE_OP_LOGGER.trace("Looking for valid MM paths under {}", 
fromPath);
+    AcidUtils.Directory acidState = AcidUtils.getAcidState(fromPath, hiveConf, 
ids);
+    List<Path> validPaths = new ArrayList<>();
+    Path base = acidState.getBaseDirectory();
+    if (base != null) {
+      validPaths.add(base);
+    }
+    for (ParsedDelta pd : acidState.getCurrentDirectories()) {
+      validPaths.add(pd.getPath());
+    }
+    return validPaths;
+  }
+
+
   /**
    * This needs the root data directory to which the data needs to be exported 
to.
    * The data export here is a list of files either in table/partition that 
are written to the _files
@@ -90,8 +141,19 @@ public class FileOperations {
    */
   private void exportFilesAsList() throws SemanticException, IOException {
     try (BufferedWriter writer = writer()) {
-      for (Path dataPath : dataPathList) {
-        writeFilesList(listFilesInDir(dataPath), writer, 
AcidUtils.getAcidSubDir(dataPath));
+      if (mmCtx != null) {
+        assert dataPathList.size() == 1;
+        Path dataPath = dataPathList.get(0);
+        ValidWriteIdList ids = AcidUtils.getTableValidWriteIdList(
+            hiveConf, mmCtx.getFqTableName());
+        List<Path> validPaths = getMmValidPaths(ids, dataPath);
+        for (Path mmPath : validPaths) {
+          writeFilesList(listFilesInDir(mmPath), writer, 
AcidUtils.getAcidSubDir(dataPath));
+        }
+      } else {
+        for (Path dataPath : dataPathList) {
+          writeFilesList(listFilesInDir(dataPath), writer, 
AcidUtils.getAcidSubDir(dataPath));
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
index c0e4a43..018983f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
@@ -33,15 +33,10 @@ public class CopyWork implements Serializable {
   private Path[] fromPath;
   private Path[] toPath;
   private boolean errorOnSrcEmpty;
-  private boolean isSkipMmDirs = false;
 
   public CopyWork() {
   }
 
-  public CopyWork(final Path fromPath, final Path toPath) {
-    this(fromPath, toPath, true);
-  }
-
   public CopyWork(final Path fromPath, final Path toPath, boolean 
errorOnSrcEmpty) {
     this(new Path[] { fromPath }, new Path[] { toPath });
     this.setErrorOnSrcEmpty(errorOnSrcEmpty);
@@ -92,17 +87,4 @@ public class CopyWork implements Serializable {
   public boolean isErrorOnSrcEmpty() {
     return errorOnSrcEmpty;
   }
-
-  /**
-   *  Whether the copy should ignore MM directories in the source, and copy 
their content to
-   * destination directly, rather than copying the directories themselves.
-   * */
-  public void setSkipSourceMmDirs(boolean isMm) {
-    this.isSkipMmDirs = isMm;
-  }
-
-  public boolean doSkipSourceMmDirs() {
-    return isSkipMmDirs ;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java
index 72ce798..d91569e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java
@@ -17,39 +17,65 @@
  */
 package org.apache.hadoop.hive.ql.plan;
 
+import java.io.Serializable;
+
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
-
 @Explain(displayName = "Export Work", explainLevels = { Explain.Level.USER, 
Explain.Level.DEFAULT,
     Explain.Level.EXTENDED })
 public class ExportWork implements Serializable {
-  private Logger LOG = LoggerFactory.getLogger(ExportWork.class);
+  private static Logger LOG = LoggerFactory.getLogger(ExportWork.class);
 
   private static final long serialVersionUID = 1L;
 
+  public final static class MmContext {
+    private final String fqTableName;
+
+    private MmContext(String fqTableName) {
+      this.fqTableName = fqTableName;
+    }
+
+    @Override
+    public String toString() {
+      return "[" + fqTableName + "]";
+    }
+
+    public static MmContext createIfNeeded(Table t) {
+      if (t == null) return null;
+      if (!AcidUtils.isInsertOnlyTable(t.getParameters())) return null;
+      return new MmContext(AcidUtils.getFullTableName(t.getDbName(), 
t.getTableName()));
+    }
+
+    public String getFqTableName() {
+      return fqTableName;
+    }
+  }
+
   private final String exportRootDirName;
   private TableSpec tableSpec;
   private ReplicationSpec replicationSpec;
   private String astRepresentationForErrorMsg;
-  private String qualifiedTableName;
+  private String acidFqTableName;
+  private final MmContext mmContext;
 
   /**
-   * @param qualifiedTable if exporting Acid table, this is temp table - null 
otherwise
+   * @param acidFqTableName if exporting Acid table, this is temp table - null 
otherwise
    */
   public ExportWork(String exportRootDirName, TableSpec tableSpec, 
ReplicationSpec replicationSpec,
-      String astRepresentationForErrorMsg, String qualifiedTable) {
+      String astRepresentationForErrorMsg, String acidFqTableName, MmContext 
mmContext) {
     this.exportRootDirName = exportRootDirName;
     this.tableSpec = tableSpec;
     this.replicationSpec = replicationSpec;
     this.astRepresentationForErrorMsg = astRepresentationForErrorMsg;
-    this.qualifiedTableName = qualifiedTable;
+    this.mmContext = mmContext;
+    this.acidFqTableName = acidFqTableName;
   }
 
   public String getExportRootDir() {
@@ -60,24 +86,16 @@ public class ExportWork implements Serializable {
     return tableSpec;
   }
 
-  public void setTableSpec(TableSpec tableSpec) {
-    this.tableSpec = tableSpec;
-  }
-
   public ReplicationSpec getReplicationSpec() {
     return replicationSpec;
   }
 
-  public void setReplicationSpec(ReplicationSpec replicationSpec) {
-    this.replicationSpec = replicationSpec;
-  }
-
   public String getAstRepresentationForErrorMsg() {
     return astRepresentationForErrorMsg;
   }
 
-  public void setAstRepresentationForErrorMsg(String 
astRepresentationForErrorMsg) {
-    this.astRepresentationForErrorMsg = astRepresentationForErrorMsg;
+  public MmContext getMmContext() {
+    return mmContext;
   }
 
   /**
@@ -88,10 +106,10 @@ public class ExportWork implements Serializable {
    * for more info.
    */
   public void acidPostProcess(Hive db) throws HiveException {
-    if(qualifiedTableName != null) {
-      LOG.info("Swapping export of " + tableSpec.tableName + " to " + 
qualifiedTableName +
+    if (acidFqTableName != null) {
+      LOG.info("Swapping export of " + tableSpec.tableName + " to " + 
acidFqTableName +
           " using partSpec=" + tableSpec.partSpec);
-      tableSpec = new TableSpec(db, qualifiedTableName, tableSpec.partSpec, 
true);
+      tableSpec = new TableSpec(db, acidFqTableName, tableSpec.partSpec, true);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 6faba42..3e2784b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -17,9 +17,24 @@
  */
 package org.apache.hadoop.hive.ql;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.LockState;
@@ -47,13 +62,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.TimeUnit;
-
 /**
  * The LockManager is not ready, but for no-concurrency straight-line path we 
can
  * test AC=true, and AC=false with commit/rollback/exception and test 
resulting data.
@@ -152,6 +160,106 @@ public class TestTxnCommands extends 
TxnCommandsBaseForTests {
     Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs1);
   }
 
+  @Test
+  public void testMmExim() throws Exception {
+    String tableName = "mm_table", importName = tableName + "_import";
+    runStatementOnDriver("drop table if exists " + tableName);
+    runStatementOnDriver(String.format("create table %s (a int, b int) stored 
as orc " +
+        "TBLPROPERTIES ('transactional'='true', 
'transactional_properties'='insert_only')",
+        tableName));
+
+    // Regular insert: export some MM deltas, then import into a new table.
+    int[][] rows1 = {{1,2},{3,4}};
+    runStatementOnDriver(String.format("insert into %s (a,b) %s",
+        tableName, makeValuesClause(rows1)));
+    runStatementOnDriver(String.format("insert into %s (a,b) %s",
+        tableName, makeValuesClause(rows1)));
+    IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+    org.apache.hadoop.hive.metastore.api.Table table = 
msClient.getTable("default", tableName);
+    FileSystem fs = FileSystem.get(hiveConf);
+    Path exportPath = new Path(table.getSd().getLocation() + "_export");
+    fs.delete(exportPath, true);
+    runStatementOnDriver(String.format("export table %s to '%s'", tableName, 
exportPath));
+    List<String> paths = listPathsRecursive(fs, exportPath);
+    verifyMmExportPaths(paths, 2);
+    runStatementOnDriver(String.format("import table %s from '%s'", 
importName, exportPath));
+    org.apache.hadoop.hive.metastore.api.Table imported = 
msClient.getTable("default", importName);
+    Assert.assertEquals(imported.toString(), "insert_only",
+        imported.getParameters().get("transactional_properties"));
+    Path importPath = new Path(imported.getSd().getLocation());
+    FileStatus[] stat = fs.listStatus(importPath, AcidUtils.hiddenFileFilter);
+    Assert.assertEquals(Arrays.toString(stat), 1, stat.length);
+    assertIsDelta(stat[0]);
+    List<String> allData = stringifyValues(rows1);
+    allData.addAll(stringifyValues(rows1));
+    allData.sort(null);
+    Collections.sort(allData);
+    List<String> rs = runStatementOnDriver(
+        String.format("select a,b from %s order by a,b", importName));
+    Assert.assertEquals("After import: " + rs, allData, rs);
+    runStatementOnDriver("drop table if exists " + importName);
+    
+    // Do insert overwrite to create some invalid deltas, and import into a 
non-MM table.
+    int[][] rows2 = {{5,6},{7,8}};
+    runStatementOnDriver(String.format("insert overwrite table %s %s",
+        tableName, makeValuesClause(rows2)));
+    fs.delete(exportPath, true);
+    runStatementOnDriver(String.format("export table %s to '%s'", tableName, 
exportPath));
+    paths = listPathsRecursive(fs, exportPath);
+    verifyMmExportPaths(paths, 1);
+    runStatementOnDriver(String.format("create table %s (a int, b int) stored 
as orc " +
+        "TBLPROPERTIES ('transactional'='false')", importName));
+    runStatementOnDriver(String.format("import table %s from '%s'", 
importName, exportPath));
+    imported = msClient.getTable("default", importName);
+    Assert.assertNull(imported.toString(), 
imported.getParameters().get("transactional"));
+    Assert.assertNull(imported.toString(),
+        imported.getParameters().get("transactional_properties"));
+    importPath = new Path(imported.getSd().getLocation());
+    stat = fs.listStatus(importPath, AcidUtils.hiddenFileFilter);
+    allData = stringifyValues(rows2);
+    Collections.sort(allData);
+    rs = runStatementOnDriver(String.format("select a,b from %s order by a,b", 
importName));
+    Assert.assertEquals("After import: " + rs, allData, rs);
+    runStatementOnDriver("drop table if exists " + importName);
+    runStatementOnDriver("drop table if exists " + tableName);
+    msClient.close();
+  }
+
+  private void assertIsDelta(FileStatus stat) {
+    Assert.assertTrue(stat.toString(),
+        stat.getPath().getName().startsWith(AcidUtils.DELTA_PREFIX));
+  }
+
+  private void verifyMmExportPaths(List<String> paths, int deltasOrBases) {
+    // 1 file, 1 dir for each, for now. Plus export "data" dir.
+    // This could be changed to a flat file list later.
+    Assert.assertEquals(paths.toString(), 2 * deltasOrBases + 1, paths.size());
+    // No confusing directories in export.
+    for (String path : paths) {
+      Assert.assertFalse(path, path.startsWith(AcidUtils.DELTA_PREFIX));
+      Assert.assertFalse(path, path.startsWith(AcidUtils.BASE_PREFIX));
+    }
+  }
+
+  private List<String> listPathsRecursive(FileSystem fs, Path path) throws 
IOException {
+    List<String> paths = new ArrayList<>();
+    LinkedList<Path> queue = new LinkedList<>();
+    queue.add(path);
+    while (!queue.isEmpty()) {
+      Path next = queue.pollFirst();
+      FileStatus[] stats = fs.listStatus(next, AcidUtils.hiddenFileFilter);
+      for (FileStatus stat : stats) {
+        Path child = stat.getPath();
+        paths.add(child.toString());
+        if (stat.isDirectory()) {
+          queue.add(child);
+        }
+      }
+    }
+    return paths;
+  }
+
+  
   /**
    * add tests for all transitions - AC=t, AC=t, AC=f, commit (for example)
    * @throws Exception

http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
index 91f2d13..8d3ab77 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
@@ -477,6 +477,7 @@ 
target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/
   }
   private void testMM(boolean existingTable, boolean isSourceMM) throws 
Exception {
     HiveConf.setBoolVar(hiveConf, 
HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true);
+    hiveConf.setBoolean("mapred.input.dir.recursive", true);
 
     int[][] data = {{1,2}, {3, 4}, {5, 6}};
     runStatementOnDriver("drop table if exists T");
@@ -500,9 +501,10 @@ 
target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/
     //verify that we are indeed doing an Acid write (import)
     rs = runStatementOnDriver("select INPUT__FILE__NAME from T order by 
INPUT__FILE__NAME");
     Assert.assertEquals(3, rs.size());
-    
Assert.assertTrue(rs.get(0).endsWith("t/delta_0000001_0000001_0000/000000_0"));
-    
Assert.assertTrue(rs.get(1).endsWith("t/delta_0000001_0000001_0000/000000_0"));
-    
Assert.assertTrue(rs.get(2).endsWith("t/delta_0000001_0000001_0000/000000_0"));
+    for (String s : rs) {
+      Assert.assertTrue(s, s.contains("/delta_0000001_0000001_0000/"));
+      Assert.assertTrue(s, s.endsWith("/000000_0"));
+    }
   }
   private void checkResult(String[][] expectedResult, String query, boolean 
isVectorized,
       String msg) throws Exception{
@@ -516,6 +518,7 @@ 
target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/
   @Test
   public void testMMExportAborted() throws Exception {
     HiveConf.setBoolVar(hiveConf, 
HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true);
+    hiveConf.setBoolean("mapred.input.dir.recursive", true);
     int[][] data = {{1, 2}, {3, 4}, {5, 6}};
     int[][] dataAbort = {{10, 2}};
     runStatementOnDriver("drop table if exists T");

http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index 25a63d8..8a55d8c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -217,7 +217,8 @@ public abstract class TxnCommandsBaseForTests {
   void checkExpected(List<String> rs, String[][] expected, String msg, Logger 
LOG, boolean checkFileName) {
     LOG.warn(testName.getMethodName() + ": read data(" + msg + "): ");
     logResult(LOG, rs);
-    Assert.assertEquals( testName.getMethodName() + ": " + msg, 
expected.length, rs.size());
+    Assert.assertEquals(testName.getMethodName() + ": " + msg + "; " + rs,
+        expected.length, rs.size());
     //verify data and layout
     for(int i = 0; i < expected.length; i++) {
       Assert.assertTrue("Actual line (data) " + i + " data: " + rs.get(i), 
rs.get(i).startsWith(expected[i][0]));

http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/test/queries/clientpositive/mm_exim.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_exim.q 
b/ql/src/test/queries/clientpositive/mm_exim.q
index b0c030d..debcc24 100644
--- a/ql/src/test/queries/clientpositive/mm_exim.q
+++ b/ql/src/test/queries/clientpositive/mm_exim.q
@@ -57,13 +57,13 @@ drop table import1_mm;
 
 drop table import2_mm;
 import table import2_mm from 'ql/test/data/exports/intermmediate_nonpart';
-desc import2_mm;
+desc formatted import2_mm;
 select * from import2_mm order by key, p;
 drop table import2_mm;
 
 drop table import3_mm;
 import table import3_mm from 'ql/test/data/exports/intermmediate_part';
-desc import3_mm;
+desc formatted import3_mm;
 select * from import3_mm order by key, p;
 drop table import3_mm;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/test/results/clientpositive/llap/mm_exim.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/mm_exim.q.out 
b/ql/src/test/results/clientpositive/llap/mm_exim.q.out
index 8a43d0f..8dfc738 100644
--- a/ql/src/test/results/clientpositive/llap/mm_exim.q.out
+++ b/ql/src/test/results/clientpositive/llap/mm_exim.q.out
@@ -292,14 +292,42 @@ POSTHOOK: type: IMPORT
 #### A masked pattern was here ####
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@import2_mm
-PREHOOK: query: desc import2_mm
+PREHOOK: query: desc formatted import2_mm
 PREHOOK: type: DESCTABLE
 PREHOOK: Input: default@import2_mm
-POSTHOOK: query: desc import2_mm
+POSTHOOK: query: desc formatted import2_mm
 POSTHOOK: type: DESCTABLE
 POSTHOOK: Input: default@import2_mm
+# col_name             data_type               comment             
 key                    int                                         
 p                      int                                         
+                
+# Detailed Table Information            
+Database:              default                  
+#### A masked pattern was here ####
+Retention:             0                        
+#### A masked pattern was here ####
+Table Type:            MANAGED_TABLE            
+Table Parameters:               
+       bucketing_version       2                   
+       numFiles                3                   
+       numRows                 6                   
+       rawDataSize             37                  
+       totalSize               43                  
+       transactional           true                
+       transactional_properties        insert_only         
+#### A masked pattern was here ####
+                
+# Storage Information           
+SerDe Library:         org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe      
 
+InputFormat:           org.apache.hadoop.mapred.TextInputFormat         
+OutputFormat:          
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat       
+Compressed:            No                       
+Num Buckets:           -1                       
+Bucket Columns:        []                       
+Sort Columns:          []                       
+Storage Desc Params:            
+       serialization.format    1                   
 PREHOOK: query: select * from import2_mm order by key, p
 PREHOOK: type: QUERY
 PREHOOK: Input: default@import2_mm
@@ -338,18 +366,46 @@ POSTHOOK: Output: default@import3_mm
 POSTHOOK: Output: default@import3_mm@p=455
 POSTHOOK: Output: default@import3_mm@p=456
 POSTHOOK: Output: default@import3_mm@p=457
-PREHOOK: query: desc import3_mm
+PREHOOK: query: desc formatted import3_mm
 PREHOOK: type: DESCTABLE
 PREHOOK: Input: default@import3_mm
-POSTHOOK: query: desc import3_mm
+POSTHOOK: query: desc formatted import3_mm
 POSTHOOK: type: DESCTABLE
 POSTHOOK: Input: default@import3_mm
+# col_name             data_type               comment             
 key                    int                                         
-p                      int                                         
                 
 # Partition Information                 
 # col_name             data_type               comment             
 p                      int                                         
+                
+# Detailed Table Information            
+Database:              default                  
+#### A masked pattern was here ####
+Retention:             0                        
+#### A masked pattern was here ####
+Table Type:            MANAGED_TABLE            
+Table Parameters:               
+       bucketing_version       2                   
+       numFiles                3                   
+       numPartitions           3                   
+       numRows                 6                   
+       rawDataSize             13                  
+       totalSize               19                  
+       transactional           true                
+       transactional_properties        insert_only         
+#### A masked pattern was here ####
+                
+# Storage Information           
+SerDe Library:         org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe      
 
+InputFormat:           org.apache.hadoop.mapred.TextInputFormat         
+OutputFormat:          
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat       
+Compressed:            No                       
+Num Buckets:           -1                       
+Bucket Columns:        []                       
+Sort Columns:          []                       
+Storage Desc Params:            
+       serialization.format    1                   
 PREHOOK: query: select * from import3_mm order by key, p
 PREHOOK: type: QUERY
 PREHOOK: Input: default@import3_mm

Reply via email to