HIVE-14645 : table conversion to and from MM (Sergey Shelukhin)

Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e083d33a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e083d33a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e083d33a

Branch: refs/heads/hive-14535
Commit: e083d33ac484d0ec99bc7789d7fc633723a1d634
Parents: 0f3998a
Author: Sergey Shelukhin <[email protected]>
Authored: Thu Oct 27 18:45:28 2016 -0700
Committer: Sergey Shelukhin <[email protected]>
Committed: Thu Oct 27 18:45:28 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/common/ValidWriteIds.java       |   9 +
 .../hadoop/hive/metastore/HiveMetaStore.java    |   4 +-
 .../hadoop/hive/metastore/MetaStoreUtils.java   |   1 +
 .../hadoop/hive/metastore/ObjectStore.java      |   1 +
 .../TransactionalValidationListener.java        |   7 +-
 .../java/org/apache/hadoop/hive/ql/Driver.java  |  12 +-
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 222 ++++-
 .../hadoop/hive/ql/exec/ImportCommitTask.java   |   1 +
 .../apache/hadoop/hive/ql/exec/MoveTask.java    |   1 +
 .../apache/hadoop/hive/ql/exec/StatsTask.java   |   4 +-
 .../hadoop/hive/ql/hooks/WriteEntity.java       |   3 +-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java |   4 +
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    |   6 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |  14 +-
 .../apache/hadoop/hive/ql/metadata/Table.java   |   3 +
 .../BucketingSortingReduceSinkOptimizer.java    |   2 +-
 .../hive/ql/optimizer/StatsOptimizer.java       |   2 +-
 .../hive/ql/parse/DDLSemanticAnalyzer.java      |  28 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |  14 +-
 .../hadoop/hive/ql/plan/TableScanDesc.java      |   2 +-
 .../queries/clientpositive/mm_conversions.q     |  85 ++
 .../clientpositive/llap/mm_conversions.q.out    | 951 +++++++++++++++++++
 22 files changed, 1317 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java 
b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
index c61b63a..78ec3ad 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,6 +93,14 @@ public class ValidWriteIds {
     conf.set(createConfKey(dbName, tblName), source);
   }
 
+  public static void clearConf(HiveConf conf, String dbName, String tblName) {
+    if (LOG.isDebugEnabled()) {
+      // TODO# remove
+      LOG.debug("Unsetting " + createConfKey(dbName, tblName));
+    }
+    conf.unset(createConfKey(dbName, tblName));
+  }
+
   public String toString() {
     // TODO: lifted from ACID config implementation... optimize if needed? 
e.g. ranges, base64
     StringBuilder buf = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 4436f3a..68b00f3 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -6457,6 +6457,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             writeId = tbl.isSetMmNextWriteId() ? tbl.getMmNextWriteId() : 0;
             tbl.setMmNextWriteId(writeId + 1);
             ms.alterTable(dbName, tblName, tbl);
+
             ok = true;
           } finally {
             if (!ok) {
@@ -6608,7 +6609,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
               Iterator<Long> iter = ids.iterator();
               long oldWatermarkId = watermarkId;
               while (iter.hasNext()) {
-                if (iter.next() != watermarkId + 1) break;
+                Long nextWriteId = iter.next();
+                if (nextWriteId != watermarkId + 1) break;
                 ++watermarkId;
               }
               long removed = watermarkId - oldWatermarkId;

http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
index 3ee1f1c..45eef36 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
@@ -1892,6 +1892,7 @@ public class MetaStoreUtils {
    * @param params table properties
    * @return true if table is an INSERT_ONLY table, false otherwise
    */
+  // TODO# also check that transactional is true
   public static boolean isInsertOnlyTable(Map<String, String> params) {
     String transactionalProp = 
params.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
     return transactionalProp != null && 
"insert_only".equalsIgnoreCase(transactionalProp);

http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 8ad7059..a1b3a09 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -636,6 +636,7 @@ public class ObjectStore implements RawStore, Configurable {
 
     transactionStatus = TXN_STATUS.COMMITED;
     try {
+      LOG.error("TODO# grrrrr");
       currentTransaction.commit();
     } catch (Exception ex) {
       Throwable candidate = ex;

http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
index f942479..ed05381 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
@@ -139,7 +139,7 @@ public final class TransactionalValidationListener extends 
MetaStorePreEventList
       hasValidTransactionalValue = true;
     }
 
-    if (!hasValidTransactionalValue) {
+    if (!hasValidTransactionalValue && 
!MetaStoreUtils.isInsertOnlyTable(oldTable.getParameters())) {
       // if here, there is attempt to set transactional to something other 
than 'true'
       // and NOT the same value it was before
       throw new MetaException("TBLPROPERTIES with 'transactional'='true' 
cannot be unset");
@@ -156,8 +156,9 @@ public final class TransactionalValidationListener extends 
MetaStorePreEventList
         // 'transactional_properties' must match the old value. Any attempt to 
alter the previous
         // value will throw an error. An exception will still be thrown if the 
previous value was
         // null and an attempt is made to set it. This behaviour can be 
changed in the future.
-        if (oldTransactionalPropertiesValue == null
-            || 
!oldTransactionalPropertiesValue.equalsIgnoreCase(transactionalPropertiesValue) 
) {
+        if ((oldTransactionalPropertiesValue == null
+            || 
!oldTransactionalPropertiesValue.equalsIgnoreCase(transactionalPropertiesValue))
+            && !MetaStoreUtils.isInsertOnlyTable(oldTable.getParameters())) {
           throw new MetaException("TBLPROPERTIES with 
'transactional_properties' cannot be "
               + "altered after the table is created");
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 19f743b..8f3ab3a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1580,8 +1580,13 @@ public class Driver implements CommandProcessor {
   private static void acquireWriteIds(QueryPlan plan, HiveConf conf) throws 
HiveException {
     // Output IDs are put directly into FileSinkDesc; here, we only need to 
take care of inputs.
     for (ReadEntity input : plan.getInputs()) {
-      Table t = extractMmTable(input);
+      Table t = extractTable(input);
       if (t == null) continue;
+      Utilities.LOG14535.info("Checking " + t.getTableName() + " for being a 
MM table: " + t.getParameters());
+      if (!MetaStoreUtils.isInsertOnlyTable(t.getParameters())) {
+        ValidWriteIds.clearConf(conf, t.getDbName(), t.getTableName());
+        continue;
+      }
       ValidWriteIds ids = Hive.get().getValidWriteIdsForTable(t.getDbName(), 
t.getTableName());
       ids.addToConf(conf, t.getDbName(), t.getTableName());
       if (plan.getFetchTask() != null) {
@@ -1590,7 +1595,7 @@ public class Driver implements CommandProcessor {
     }
   }
 
-  private static Table extractMmTable(ReadEntity input) {
+  private static Table extractTable(ReadEntity input) {
     Table t = null;
     switch (input.getType()) {
       case TABLE:
@@ -1602,8 +1607,7 @@ public class Driver implements CommandProcessor {
         break;
       default: return null;
     }
-    return (t != null && !t.isTemporary()
-        && MetaStoreUtils.isInsertOnlyTable(t.getParameters())) ? t : null;
+    return (t != null && !t.isTemporary()) ? t : null;
   }
 
   private CommandProcessorResponse rollback(CommandProcessorResponse cpr) {

http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 7cf83d8..b5ec3cb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.ValidWriteIds;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -91,6 +92,7 @@ import 
org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
@@ -163,8 +165,10 @@ import org.apache.hadoop.hive.ql.plan.FileMergeDesc;
 import org.apache.hadoop.hive.ql.plan.GrantDesc;
 import org.apache.hadoop.hive.ql.plan.GrantRevokeRoleDDL;
 import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
+import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
 import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.LockTableDesc;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.MsckDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.OrcFileMergeDesc;
@@ -238,6 +242,7 @@ import org.slf4j.LoggerFactory;
 import org.stringtemplate.v4.ST;
 
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 
 /**
  * DDLTask implementation.
@@ -1788,7 +1793,7 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
   private int compact(Hive db, AlterTableSimpleDesc desc) throws HiveException 
{
 
     Table tbl = db.getTable(desc.getTableName());
-    if (!AcidUtils.isAcidTable(tbl)) {
+    if (!AcidUtils.isFullAcidTable(tbl)) {
       throw new HiveException(ErrorMsg.NONACID_COMPACTION_NOT_SUPPORTED, 
tbl.getDbName(),
           tbl.getTableName());
     }
@@ -3349,14 +3354,16 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
     // Don't change the table object returned by the metastore, as we'll mess 
with it's caches.
     Table oldTbl = tbl;
     tbl = oldTbl.copy();
+    // Handle child tasks here. We could add them directly whereever we need,
+    // but let's make it a little bit more explicit.
     if (allPartitions != null) {
       // Alter all partitions
       for (Partition part : allPartitions) {
-        alterTableOrSinglePartition(alterTbl, tbl, part);
+        addChildTasks(alterTableOrSinglePartition(alterTbl, tbl, part));
       }
     } else {
       // Just alter the table
-      alterTableOrSinglePartition(alterTbl, tbl, null);
+      addChildTasks(alterTableOrSinglePartition(alterTbl, tbl, null));
     }
 
     if (allPartitions == null) {
@@ -3368,6 +3375,7 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
       }
     }
 
+    // TODO# WRONG!! HERE
     try {
       if (allPartitions == null) {
         db.alterTable(alterTbl.getOldName(), tbl, alterTbl.getIsCascade(), 
alterTbl.getEnvironmentContext());
@@ -3429,6 +3437,13 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
     return addIfAbsentByName(newWriteEntity, work.getOutputs());
   }
 
+  private void addChildTasks(List<Task<?>> extraTasks) {
+    if (extraTasks == null) return;
+    for (Task<?> newTask : extraTasks) {
+      addDependentTask(newTask);
+    }
+  }
+
   private boolean isSchemaEvolutionEnabled(Table tbl) {
     boolean isAcid = AcidUtils.isTablePropertyTransactional(tbl.getMetadata());
     if (isAcid || HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION)) {
@@ -3437,8 +3452,8 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
     return false;
   }
 
-  private int alterTableOrSinglePartition(AlterTableDesc alterTbl, Table tbl, 
Partition part)
-      throws HiveException {
+  private List<Task<?>> alterTableOrSinglePartition(
+      AlterTableDesc alterTbl, Table tbl, Partition part) throws HiveException 
{
 
     if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.RENAME) {
       tbl.setDbName(Utilities.getDatabaseName(alterTbl.getNewName()));
@@ -3576,20 +3591,9 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
       }
       sd.setCols(alterTbl.getNewCols());
     } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDPROPS) {
-      if (part != null) {
-        part.getTPartition().getParameters().putAll(alterTbl.getProps());
-      } else {
-        tbl.getTTable().getParameters().putAll(alterTbl.getProps());
-      }
+      return alterTableAddProps(alterTbl, tbl, part);
     } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.DROPPROPS) {
-      Iterator<String> keyItr = alterTbl.getProps().keySet().iterator();
-      while (keyItr.hasNext()) {
-        if (part != null) {
-          part.getTPartition().getParameters().remove(keyItr.next());
-        } else {
-          tbl.getTTable().getParameters().remove(keyItr.next());
-        }
-      }
+      return alterTableDropProps(alterTbl, tbl, part);
     } else if (alterTbl.getOp() == 
AlterTableDesc.AlterTableTypes.ADDSERDEPROPS) {
       StorageDescriptor sd = (part == null ? tbl.getTTable().getSd() : 
part.getTPartition().getSd());
       sd.getSerdeInfo().getParameters().putAll(alterTbl.getProps());
@@ -3724,12 +3728,12 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
     } else if (alterTbl.getOp() == AlterTableTypes.ALTERBUCKETNUM) {
       if (part != null) {
         if (part.getBucketCount() == alterTbl.getNumberBuckets()) {
-          return 0;
+          return null;
         }
         part.setBucketCount(alterTbl.getNumberBuckets());
       } else {
         if (tbl.getNumBuckets() == alterTbl.getNumberBuckets()) {
-          return 0;
+          return null;
         }
         tbl.setNumBuckets(alterTbl.getNumberBuckets());
       }
@@ -3737,7 +3741,183 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
       throw new HiveException(ErrorMsg.UNSUPPORTED_ALTER_TBL_OP, 
alterTbl.getOp().toString());
     }
 
-    return 0;
+    return null;
+  }
+
+  private List<Task<?>> alterTableDropProps(
+      AlterTableDesc alterTbl, Table tbl, Partition part) throws HiveException 
{
+    List<Task<?>> result = null;
+    if (part == null) {
+      Set<String> removedSet = alterTbl.getProps().keySet();
+      boolean isFromMmTable = 
MetaStoreUtils.isInsertOnlyTable(tbl.getParameters()),
+          isRemoved = 
removedSet.contains(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES)
+              || 
removedSet.contains(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+      if (isFromMmTable && isRemoved) {
+        result = generateRemoveMmTasks(tbl);
+      }
+    }
+    Iterator<String> keyItr = alterTbl.getProps().keySet().iterator();
+    while (keyItr.hasNext()) {
+      if (part != null) {
+        part.getTPartition().getParameters().remove(keyItr.next());
+      } else {
+        tbl.getTTable().getParameters().remove(keyItr.next());
+      }
+    }
+    return result;
+  }
+
+  private List<Task<?>> generateRemoveMmTasks(Table tbl) throws HiveException {
+    // To avoid confusion from nested MM directories when table is converted 
back and forth,
+    // we will do the following - we will rename mm_ dirs to remove the 
prefix; we will also
+    // delete any directories that are not committed. Note that this relies on 
locks.
+    // Note also that we only do the renames AFTER the metastore operation 
commits.
+    // Deleting uncommitted things is safe, but moving stuff before we convert 
is data loss.
+    List<Path> allMmDirs = new ArrayList<>();
+    if (tbl.isStoredAsSubDirectories()) {
+      // TODO: support this?
+      throw new HiveException("Converting list bucketed tables stored as 
subdirectories "
+          + " to and from MM is not supported");
+    }
+    Hive db = getHive();
+    ValidWriteIds ids = db.getValidWriteIdsForTable(tbl.getDbName(), 
tbl.getTableName());
+    if (tbl.getPartitionKeys().size() > 0) {
+      PartitionIterable parts = new PartitionIterable(db, tbl, null,
+          HiveConf.getIntVar(conf, ConfVars.METASTORE_BATCH_RETRIEVE_MAX));
+      Iterator<Partition> partIter = parts.iterator();
+      while (partIter.hasNext()) {
+        Partition part = partIter.next();
+        checkMmLb(part);
+        handleRemoveMm(part.getDataLocation(), ids, allMmDirs);
+      }
+    } else {
+      checkMmLb(tbl);
+      handleRemoveMm(tbl.getDataLocation(), ids, allMmDirs);
+    }
+    List<Path> targetPaths = new ArrayList<>(allMmDirs.size());
+    int prefixLen = ValidWriteIds.MM_PREFIX.length();
+    for (int i = 0; i < allMmDirs.size(); ++i) {
+      Path src = allMmDirs.get(i);
+      Path tgt = new Path(src.getParent(), src.getName().substring(prefixLen + 
1));
+      Utilities.LOG14535.info("Will move " + src + " to " + tgt);
+      targetPaths.add(tgt);
+    }
+    // Don't set inputs and outputs - the locks have already been taken so 
it's pointless.
+    MoveWork mw = new MoveWork(null, null, null, null, false);
+    mw.setMultiFilesDesc(new LoadMultiFilesDesc(allMmDirs, targetPaths, true, 
null, null));
+    return Lists.<Task<?>>newArrayList(TaskFactory.get(mw, conf));
+  }
+
+  private void checkMmLb(Table tbl) throws HiveException {
+    if (!tbl.isStoredAsSubDirectories()) return;
+    // TODO: support this?
+    throw new HiveException("Converting list bucketed tables stored as 
subdirectories "
+        + " to and from MM is not supported");
+  }
+
+  private void checkMmLb(Partition part) throws HiveException {
+    if (!part.isStoredAsSubDirectories()) return;
+    // TODO: support this?
+    throw new HiveException("Converting list bucketed tables stored as 
subdirectories "
+        + " to and from MM is not supported. Please create a table in the 
desired format.");
+  }
+
+  private void handleRemoveMm(
+      Path path, ValidWriteIds ids, List<Path> result) throws HiveException {
+    // Note: doesn't take LB into account; that is not presently supported 
here (throws above).
+    try {
+      FileSystem fs = path.getFileSystem(conf);
+      for (FileStatus file : fs.listStatus(path)) {
+        Path childPath = file.getPath();
+        if (!file.isDirectory()) {
+          ensureDelete(fs, childPath, "a non-directory file");
+          continue;
+        }
+        Long writeId = ValidWriteIds.extractWriteId(childPath);
+        if (writeId == null) {
+          ensureDelete(fs, childPath, "an unknown directory");
+        } else if (!ids.isValid(writeId)) {
+          // Assume no concurrent active writes - we rely on locks here. We 
could check and fail.
+          ensureDelete(fs, childPath, "an uncommitted directory");
+        } else {
+          result.add(childPath);
+        }
+      }
+    } catch (IOException ex) {
+      throw new HiveException(ex);
+    }
+  }
+
+  private static void ensureDelete(FileSystem fs, Path path, String what) 
throws IOException {
+    Utilities.LOG14535.info("Deleting " + what + " " + path);
+    try {
+      if (!fs.delete(path, true)) throw new IOException("delete returned 
false");
+    } catch (Exception ex) {
+      String error = "Couldn't delete " + path + "; cannot remove MM setting 
from the table";
+      LOG.error(error, ex);
+      throw (ex instanceof IOException) ? (IOException)ex : new 
IOException(ex);
+    }
+  }
+
+  private List<Task<?>> generateAddMmTasks(Table tbl) throws HiveException {
+    // We will move all the files in the table/partition directories into the 
first MM
+    // directory, then commit the first write ID.
+    List<Path> srcs = new ArrayList<>(), tgts = new ArrayList<>();
+    Hive db = getHive();
+    long mmWriteId = db.getNextTableWriteId(tbl.getDbName(), 
tbl.getTableName());
+    String mmDir = ValidWriteIds.getMmFilePrefix(mmWriteId);
+    if (tbl.getPartitionKeys().size() > 0) {
+      PartitionIterable parts = new PartitionIterable(db, tbl, null,
+          HiveConf.getIntVar(conf, ConfVars.METASTORE_BATCH_RETRIEVE_MAX));
+      Iterator<Partition> partIter = parts.iterator();
+      while (partIter.hasNext()) {
+        Partition part = partIter.next();
+        checkMmLb(part);
+        Path src = part.getDataLocation(), tgt = new Path(src, mmDir);
+        srcs.add(src);
+        tgts.add(tgt);
+        Utilities.LOG14535.info("Will move " + src + " to " + tgt);
+      }
+    } else {
+      checkMmLb(tbl);
+      Path src = tbl.getDataLocation(), tgt = new Path(src, mmDir);
+      srcs.add(src);
+      tgts.add(tgt);
+      Utilities.LOG14535.info("Will move " + src + " to " + tgt);
+    }
+    // Don't set inputs and outputs - the locks have already been taken so 
it's pointless.
+    MoveWork mw = new MoveWork(null, null, null, null, false);
+    mw.setMultiFilesDesc(new LoadMultiFilesDesc(srcs, tgts, true, null, null));
+    ImportCommitWork icw = new ImportCommitWork(tbl.getDbName(), 
tbl.getTableName(), mmWriteId);
+    // TODO# this is hacky and will be gone with ACID. The problem is getting 
the write ID above
+    //       modifies the table, but the table object above is preserved and 
modified without
+    //       getting this change, so saving it will overwrite write ID. 
Ideally, when we save
+    //       only specific fields, and not overwrite write ID every time we 
alter table.
+    //       There's probably some way in DN to achieve that, but for now 
let's just update the
+    //       original object here. This is safe due to DDL lock and the fact 
that converting
+    //       the table to MM here from non-MM should mean no concurrent write 
ID updates.
+    tbl.setMmNextWriteId(mmWriteId + 1);
+    Task<?> mv = TaskFactory.get(mw, conf), ic = TaskFactory.get(icw, conf);
+    mv.addDependentTask(ic);
+    return Lists.<Task<?>>newArrayList(mv);
+  }
+
+  private List<Task<?>> alterTableAddProps(AlterTableDesc alterTbl, Table tbl,
+      Partition part) throws HiveException {
+    List<Task<?>> result = null;
+    if (part != null) {
+      part.getTPartition().getParameters().putAll(alterTbl.getProps());
+    } else {
+      boolean isFromMmTable = 
MetaStoreUtils.isInsertOnlyTable(tbl.getParameters()),
+          isToMmTable = MetaStoreUtils.isInsertOnlyTable(alterTbl.getProps());
+      if (!isFromMmTable && isToMmTable) {
+        result = generateAddMmTasks(tbl);
+      } else if (isFromMmTable && !isToMmTable) {
+        result = generateRemoveMmTasks(tbl);
+      }
+      tbl.getTTable().getParameters().putAll(alterTbl.getProps());
+    }
+    return result;
   }
 
    private int dropConstraint(Hive db, AlterTableDesc alterTbl)

http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java
index efa9bc3..ba009b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java
@@ -39,6 +39,7 @@ public class ImportCommitTask extends Task<ImportCommitWork> {
 
     try {
       if (driverContext.getCtx().getExplainAnalyze() == AnalyzeState.RUNNING) {
+        Utilities.LOG14535.info("Exiting due to explain");
         return 0;
       }
       Hive db = getHive();

http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 22843c9..76e399e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -278,6 +278,7 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
 
       // Multi-file load is for dynamic partitions when some partitions do not
       // need to merge and they can simply be moved to the target directory.
+      // This is also used for MM table conversion.
       LoadMultiFilesDesc lmfd = work.getLoadMultiFilesWork();
       if (lmfd != null) {
         boolean isDfsDir = lmfd.getIsDfsDir();

http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
index 9e528b5..ad5728f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
@@ -172,7 +172,7 @@ public class StatsTask extends Task<StatsWork> implements 
Serializable {
         // work.getLoadTableDesc().getReplace() is true means insert overwrite 
command 
         // work.getLoadFileDesc().getDestinationCreateTable().isEmpty() means 
CTAS etc.
         // acidTable will not have accurate stats unless it is set through 
analyze command.
-        if (work.getTableSpecs() == null && AcidUtils.isAcidTable(table)) {
+        if (work.getTableSpecs() == null && AcidUtils.isFullAcidTable(table)) {
           StatsSetupConst.setBasicStatsState(parameters, 
StatsSetupConst.FALSE);
         } else if (work.getTableSpecs() != null
             || (work.getLoadTableDesc() != null && 
work.getLoadTableDesc().getReplace())
@@ -222,7 +222,7 @@ public class StatsTask extends Task<StatsWork> implements 
Serializable {
           //
           org.apache.hadoop.hive.metastore.api.Partition tPart = 
partn.getTPartition();
           Map<String, String> parameters = tPart.getParameters();
-          if (work.getTableSpecs() == null && AcidUtils.isAcidTable(table)) {
+          if (work.getTableSpecs() == null && 
AcidUtils.isFullAcidTable(table)) {
             StatsSetupConst.setBasicStatsState(parameters, 
StatsSetupConst.FALSE);
           } else if (work.getTableSpecs() != null
               || (work.getLoadTableDesc() != null && 
work.getLoadTableDesc().getReplace())

http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java 
b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
index 9e18638..1c18975 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
@@ -212,7 +212,8 @@ public class WriteEntity extends Entity implements 
Serializable {
 
       case ADDPARTITION:
       case ADDSERDEPROPS:
-      case ADDPROPS: return WriteType.DDL_SHARED;
+      case ADDPROPS:
+        return WriteType.DDL_SHARED;
 
       case COMPACT:
       case TOUCH: return WriteType.DDL_NO_LOCK;

http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 2dfbc8d..26c65f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -1108,6 +1108,10 @@ public class AcidUtils {
     return tableIsTransactional != null && 
tableIsTransactional.equalsIgnoreCase("true");
   }
 
+  public static boolean isFullAcidTable(Table table) {
+    return isAcidTable(table) && 
!MetaStoreUtils.isInsertOnlyTable(table.getParameters());
+  }
+
   /**
    * Sets the acidOperationalProperties in the configuration object argument.
    * @param conf Mutable configuration object

http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index da7505b..40d055a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -230,7 +230,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
           // This is a file or something we don't hold locks for.
           continue;
       }
-      if(t != null && AcidUtils.isAcidTable(t)) {
+      if(t != null && AcidUtils.isFullAcidTable(t)) {
         compBuilder.setIsAcid(true);
       }
       LockComponent comp = compBuilder.build();
@@ -261,7 +261,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
 
         case INSERT:
           t = getTable(output);
-          if(AcidUtils.isAcidTable(t)) {
+          if(AcidUtils.isFullAcidTable(t)) {
             compBuilder.setShared();
             compBuilder.setIsAcid(true);
           }
@@ -318,7 +318,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
           // This is a file or something we don't hold locks for.
           continue;
       }
-      if(t != null && AcidUtils.isAcidTable(t)) {
+      if(t != null && AcidUtils.isFullAcidTable(t)) {
         compBuilder.setIsAcid(true);
       }
       LockComponent comp = compBuilder.build();

http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 4a8df95..a1ec7e4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -2508,7 +2508,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @param tbl
    *          object for which partition is needed
    * @return list of partition objects
-   * @throws HiveException
    */
   public List<Partition> getPartitions(Table tbl) throws HiveException {
     if (tbl.isPartitioned()) {
@@ -3143,13 +3142,14 @@ private void constructOneLBLocationMap(FileStatus fSta,
         HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
     HdfsUtils.HadoopFileStatus destStatus = null;
 
-    // If source path is a subdirectory of the destination path:
+    // If source path is a subdirectory of the destination path (or the other 
way around):
     //   ex: INSERT OVERWRITE DIRECTORY 'target/warehouse/dest4.out' SELECT 
src.value WHERE src.key >= 300;
     //   where the staging directory is a subdirectory of the destination 
directory
     // (1) Do not delete the dest dir before doing the move operation.
     // (2) It is assumed that subdir and dir are in same encryption zone.
     // (3) Move individual files from scr dir to dest dir.
-    boolean destIsSubDir = isSubDir(srcf, destf, srcFs, destFs, isSrcLocal);
+    boolean srcIsSubDirOfDest = isSubDir(srcf, destf, srcFs, destFs, 
isSrcLocal),
+        destIsSubDirOfSrc = isSubDir(destf, srcf, destFs, srcFs, false);
     try {
       if (inheritPerms || replace) {
         try{
@@ -3159,7 +3159,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
           //if replace is false, rename (mv) actually move the src under dest 
dir
           //if destf is an existing file, rename is actually a replace, and do 
not need
           // to delete the file first
-          if (replace && !destIsSubDir) {
+          if (replace && !srcIsSubDirOfDest) {
             destFs.delete(destf, true);
             LOG.debug("The path " + destf.toString() + " is deleted");
           }
@@ -3192,13 +3192,17 @@ private void constructOneLBLocationMap(FileStatus fSta,
               replace, // overwrite destination
               conf);
         } else {
-          if (destIsSubDir) {
+          if (srcIsSubDirOfDest || destIsSubDirOfSrc) {
             FileStatus[] srcs = destFs.listStatus(srcf, 
FileUtils.HIDDEN_FILES_PATH_FILTER);
 
             List<Future<Void>> futures = new LinkedList<>();
             final ExecutorService pool = 
conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
                 
Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname,
 25),
                 new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()) 
: null;
+            if (destIsSubDirOfSrc && !destFs.exists(destf)) {
+              Utilities.LOG14535.info("Creating " + destf);
+              destFs.mkdirs(destf);
+            }
             /* Move files one by one because source is a subdirectory of 
destination */
             for (final FileStatus srcStatus : srcs) {
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
index ea90889..d3466fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
@@ -986,4 +986,7 @@ public class Table implements Serializable {
     return deserializer != null;
   }
 
+  public void setMmNextWriteId(long writeId) {
+    this.tTable.setMmNextWriteId(writeId);
+  }
 };

http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
index da261bb..25836c3 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
@@ -409,7 +409,7 @@ public class BucketingSortingReduceSinkOptimizer extends 
Transform {
 
       if(stack.get(0) instanceof TableScanOperator) {
         TableScanOperator tso = ((TableScanOperator)stack.get(0));
-        if(AcidUtils.isAcidTable(tso.getConf().getTableMetadata())) {
+        if(AcidUtils.isFullAcidTable(tso.getConf().getTableMetadata())) {
           /*ACID tables have complex directory layout and require merging of 
delta files
           * on read thus we should not try to read bucket files directly*/
           return null;

http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java
index 17510e9..5a9c72f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java
@@ -319,7 +319,7 @@ public class StatsOptimizer extends Transform {
         }
 
         Table tbl = tsOp.getConf().getTableMetadata();
-        if (AcidUtils.isAcidTable(tbl)) {
+        if (AcidUtils.isFullAcidTable(tbl)) {
           Logger.info("Table " + tbl.getTableName() + " is ACID table. Skip 
StatsOptimizer.");
           return null;
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index 3e016f3..e6a31e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
@@ -72,7 +73,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.PKInfo;
 import org.apache.hadoop.hive.ql.parse.authorization.AuthorizationParseUtils;
 import 
org.apache.hadoop.hive.ql.parse.authorization.HiveAuthorizationTaskFactory;
 import 
org.apache.hadoop.hive.ql.parse.authorization.HiveAuthorizationTaskFactoryImpl;
@@ -184,6 +184,7 @@ public class DDLSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
   private final Set<String> reservedPartitionValues;
   private final HiveAuthorizationTaskFactory hiveAuthorizationTaskFactory;
+  private WriteEntity alterTableOutput;
 
   static {
     TokenToTypeName.put(HiveParser.TOK_BOOLEAN, 
serdeConstants.BOOLEAN_TYPE_NAME);
@@ -674,6 +675,7 @@ public class DDLSemanticAnalyzer extends 
BaseSemanticAnalyzer {
   }
 
   private void analyzeShowRoles(ASTNode ast) throws SemanticException {
+    @SuppressWarnings("unchecked")
     Task<DDLWork> roleDDLTask = (Task<DDLWork>) hiveAuthorizationTaskFactory
         .createShowRolesTask(ast, ctx.getResFile(), getInputs(), getOutputs());
 
@@ -1413,10 +1415,11 @@ public class DDLSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     alterTblDesc.setEnvironmentContext(environmentContext);
     alterTblDesc.setOldName(tableName);
 
-    addInputsOutputsAlterTable(tableName, partSpec, alterTblDesc);
+    boolean isPotentialMmSwitch = 
mapProp.containsKey(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL)
+        || 
mapProp.containsKey(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
+    addInputsOutputsAlterTable(tableName, partSpec, alterTblDesc, 
isPotentialMmSwitch);
 
-    rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
-        alterTblDesc), conf));
+    rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), 
alterTblDesc), conf));
   }
 
   private void analyzeAlterTableSerdeProps(ASTNode ast, String tableName,
@@ -1476,16 +1479,21 @@ public class DDLSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
   private void addInputsOutputsAlterTable(String tableName, Map<String, 
String> partSpec,
       AlterTableTypes op) throws SemanticException {
-    addInputsOutputsAlterTable(tableName, partSpec, null, op);
+    addInputsOutputsAlterTable(tableName, partSpec, null, op, false);
+  }
+
+  private void addInputsOutputsAlterTable(String tableName, Map<String, 
String> partSpec,
+      AlterTableDesc desc, boolean doForceExclusive) throws SemanticException {
+     addInputsOutputsAlterTable(tableName, partSpec, desc, desc.getOp(), 
doForceExclusive);
   }
 
   private void addInputsOutputsAlterTable(String tableName, Map<String, 
String> partSpec,
      AlterTableDesc desc) throws SemanticException {
-    addInputsOutputsAlterTable(tableName, partSpec, desc, desc.getOp());
+    addInputsOutputsAlterTable(tableName, partSpec, desc, desc.getOp(), false);
   }
 
   private void addInputsOutputsAlterTable(String tableName, Map<String, 
String> partSpec,
-      AlterTableDesc desc, AlterTableTypes op) throws SemanticException {
+      AlterTableDesc desc, AlterTableTypes op, boolean doForceExclusive) 
throws SemanticException {
     boolean isCascade = desc != null && desc.getIsCascade();
     boolean alterPartitions = partSpec != null && !partSpec.isEmpty();
     //cascade only occurs at table level then cascade to partition level
@@ -1496,11 +1504,13 @@ public class DDLSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
     Table tab = getTable(tableName, true);
     // Determine the lock type to acquire
-    WriteEntity.WriteType writeType = 
WriteEntity.determineAlterTableWriteType(op);
+    WriteEntity.WriteType writeType = doForceExclusive
+        ? WriteType.DDL_EXCLUSIVE : 
WriteEntity.determineAlterTableWriteType(op);
 
     if (!alterPartitions) {
       inputs.add(new ReadEntity(tab));
-      outputs.add(new WriteEntity(tab, writeType));
+      alterTableOutput = new WriteEntity(tab, writeType);
+      outputs.add(alterTableOutput);
       //do not need the lock for partitions since they are covered by the 
table lock
       if (isCascade) {
         for (Partition part : getPartitions(tab, partSpec, false)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 35cfcd9..e8b2b84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -1946,7 +1946,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
       }
 
       // Disallow INSERT INTO on bucketized tables
-      boolean isAcid = AcidUtils.isAcidTable(tab);
+      boolean isAcid = AcidUtils.isFullAcidTable(tab);
       boolean isTableWrittenTo = 
qb.getParseInfo().isInsertIntoTable(tab.getDbName(), tab.getTableName());
       if (isTableWrittenTo &&
           tab.getNumBuckets() > 0 && !isAcid) {
@@ -6471,7 +6471,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
         nullOrder.append(sortOrder == 
BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC ? 'a' : 'z');
       }
       input = genReduceSinkPlan(input, partnCols, sortCols, order.toString(), 
nullOrder.toString(),
-              maxReducers, (AcidUtils.isAcidTable(dest_tab) ?
+              maxReducers, (AcidUtils.isFullAcidTable(dest_tab) ?
               getAcidType(dest_tab, table_desc.getOutputFileFormatClass()) : 
AcidUtils.Operation.NOT_ACID));
       
reduceSinkOperatorsAddedByEnforceBucketingSorting.add((ReduceSinkOperator)input.getParentOperators().get(0));
       ctx.setMultiFileSpray(multiFileSpray);
@@ -6557,7 +6557,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
     case QBMetaData.DEST_TABLE: {
 
       dest_tab = qbm.getDestTableForAlias(dest);
-      destTableIsAcid = AcidUtils.isAcidTable(dest_tab);
+      destTableIsAcid = AcidUtils.isFullAcidTable(dest_tab);
       destTableIsTemporary = dest_tab.isTemporary();
 
       // Is the user trying to insert into a external tables
@@ -6633,7 +6633,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
       dest_part = qbm.getDestPartitionForAlias(dest);
       dest_tab = dest_part.getTable();
-      destTableIsAcid = AcidUtils.isAcidTable(dest_tab);
+      destTableIsAcid = AcidUtils.isFullAcidTable(dest_tab);
 
       checkExternalTable(dest_tab);
 
@@ -6702,7 +6702,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
         field_schemas = new ArrayList<FieldSchema>();
         destTableIsTemporary = tblDesc.isTemporary();
         destTableIsMaterialization = tblDesc.isMaterialization();
-        if (MetaStoreUtils.isInsertOnlyTable(tblDesc.getTblProps())) {
+        if (!destTableIsTemporary && 
MetaStoreUtils.isInsertOnlyTable(tblDesc.getTblProps())) {
           isMmTable = isMmCtas = true;
           // TODO# this should really get current ACID txn; assuming ACID 
works correctly the txn
           //       should have been opened to create the ACID table. For now 
use the first ID.
@@ -11429,7 +11429,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
       if (p != null) {
         tbl = p.getTable();
       }
-      if (tbl != null && AcidUtils.isAcidTable(tbl)) {
+      if (tbl != null && AcidUtils.isFullAcidTable(tbl)) {
         acidInQuery = true;
         checkAcidTxnManager(tbl);
       }
@@ -11492,7 +11492,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
         tbl = writeEntity.getTable();
       }
 
-      if (tbl != null && AcidUtils.isAcidTable(tbl)) {
+      if (tbl != null && AcidUtils.isFullAcidTable(tbl)) {
         acidInQuery = true;
         checkAcidTxnManager(tbl);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index ebe613e..65b6813 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -128,7 +128,7 @@ public class TableScanDesc extends AbstractOperatorDesc {
     this.alias = alias;
     this.virtualCols = vcs;
     this.tableMetadata = tblMetadata;
-    isAcidTable = AcidUtils.isAcidTable(this.tableMetadata);
+    isAcidTable = AcidUtils.isFullAcidTable(this.tableMetadata);
     if (isAcidTable) {
       acidOperationalProperties = 
AcidUtils.getAcidOperationalProperties(this.tableMetadata);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/test/queries/clientpositive/mm_conversions.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_conversions.q 
b/ql/src/test/queries/clientpositive/mm_conversions.q
new file mode 100644
index 0000000..4fcaf73
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/mm_conversions.q
@@ -0,0 +1,85 @@
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.fetch.task.conversion=none;
+set tez.grouping.min-size=1;
+set tez.grouping.max-size=2;
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+
+-- Force multiple writers when reading
+drop table intermediate;
+create table intermediate(key int) partitioned by (p int) stored as orc;
+insert into table intermediate partition(p='455') select distinct key from src 
where key >= 0 order by key desc limit 1;
+insert into table intermediate partition(p='456') select distinct key from src 
where key is not null order by key asc limit 1;
+insert into table intermediate partition(p='457') select distinct key from src 
where key >= 100 order by key asc limit 1;
+
+drop table simple_from_mm;
+create table simple_from_mm(key int) stored as orc tblproperties 
("transactional"="true", "transactional_properties"="insert_only");
+insert into table simple_from_mm select key from intermediate;
+insert into table simple_from_mm select key from intermediate;
+select * from simple_from_mm;
+alter table simple_from_mm unset tblproperties('transactional_properties', 
'transactional');
+select * from simple_from_mm;
+insert into table simple_from_mm select key from intermediate;
+select * from simple_from_mm;
+alter table simple_from_mm set tblproperties("transactional"="true", 
"transactional_properties"="insert_only");
+select * from simple_from_mm;
+insert into table simple_from_mm select key from intermediate;
+select * from simple_from_mm;
+alter table simple_from_mm set tblproperties("transactional"="false", 
'transactional_properties'='false');
+select * from simple_from_mm;
+insert into table simple_from_mm select key from intermediate;
+select * from simple_from_mm;
+drop table simple_from_mm;
+
+drop table simple_to_mm;
+create table simple_to_mm(key int) stored as orc;
+insert into table simple_to_mm select key from intermediate;
+insert into table simple_to_mm select key from intermediate;
+select * from simple_to_mm;
+alter table simple_to_mm set tblproperties("transactional"="true", 
"transactional_properties"="insert_only");
+select * from simple_to_mm;
+insert into table simple_to_mm select key from intermediate;
+insert into table simple_to_mm select key from intermediate;
+select * from simple_to_mm;
+drop table simple_to_mm;
+
+drop table part_from_mm;
+create table part_from_mm(key int) partitioned by (key_mm int) stored as orc 
tblproperties ("transactional"="true", 
"transactional_properties"="insert_only");
+insert into table part_from_mm partition(key_mm='455') select key from 
intermediate;
+insert into table part_from_mm partition(key_mm='455') select key from 
intermediate;
+insert into table part_from_mm partition(key_mm='456') select key from 
intermediate;
+select * from part_from_mm;
+alter table part_from_mm unset tblproperties('transactional_properties', 
'transactional');
+select * from part_from_mm;
+insert into table part_from_mm partition(key_mm='456') select key from 
intermediate;
+insert into table part_from_mm partition(key_mm='457') select key from 
intermediate;
+select * from part_from_mm;
+alter table part_from_mm set tblproperties("transactional"="true", 
"transactional_properties"="insert_only");
+select * from part_from_mm;
+insert into table part_from_mm partition(key_mm='456') select key from 
intermediate;
+insert into table part_from_mm partition(key_mm='455') select key from 
intermediate;
+select * from part_from_mm;
+alter table part_from_mm set tblproperties("transactional"="false", 
'transactional_properties'='false');
+select * from part_from_mm;
+insert into table part_from_mm partition(key_mm='457') select key from 
intermediate;
+select * from part_from_mm;
+drop table part_from_mm;
+
+drop table part_to_mm;
+create table part_to_mm(key int) partitioned by (key_mm int) stored as orc;
+insert into table part_to_mm partition(key_mm='455') select key from 
intermediate;
+insert into table part_to_mm partition(key_mm='456') select key from 
intermediate;
+select * from part_to_mm;
+alter table part_to_mm set tblproperties("transactional"="true", 
"transactional_properties"="insert_only");
+select * from part_to_mm;
+insert into table part_to_mm partition(key_mm='456') select key from 
intermediate;
+insert into table part_to_mm partition(key_mm='457') select key from 
intermediate;
+select * from part_to_mm;
+drop table part_to_mm;
+
+
+
+
+
+drop table intermediate;

Reply via email to