http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --cc 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index 0000000,a1187c4..6354f77
mode 000000,100644..100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@@ -1,0 -1,243 +1,245 @@@
+ /*
+   Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+ 
+       http://www.apache.org/licenses/LICENSE-2.0
+ 
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  */
+ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
+ 
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+ import org.apache.hadoop.hive.metastore.TableType;
+ import org.apache.hadoop.hive.metastore.api.Database;
+ import org.apache.hadoop.hive.metastore.api.MetaException;
+ import org.apache.hadoop.hive.ql.ErrorMsg;
+ import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
+ import org.apache.hadoop.hive.ql.exec.Task;
+ import org.apache.hadoop.hive.ql.exec.TaskFactory;
+ import org.apache.hadoop.hive.ql.exec.Utilities;
+ import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
+ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask;
+ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
+ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
+ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+ import org.apache.hadoop.hive.ql.metadata.Table;
+ import org.apache.hadoop.hive.ql.parse.EximUtil;
+ import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
+ import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+ import org.apache.hadoop.hive.ql.parse.SemanticException;
+ import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+ import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+ import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+ import org.apache.hadoop.hive.ql.plan.MoveWork;
++import org.apache.hadoop.hive.ql.session.SessionState;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import java.io.IOException;
+ import java.io.Serializable;
+ import java.util.ArrayList;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.TreeMap;
+ 
+ import static 
org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned;
+ 
+ public class LoadTable {
+   private final static Logger LOG = LoggerFactory.getLogger(LoadTable.class);
+   //  private final Helper helper;
+   private final Context context;
+   private final ReplLogger replLogger;
+   private final TableContext tableContext;
+   private final TaskTracker tracker;
+   private final TableEvent event;
+ 
+   public LoadTable(TableEvent event, Context context, ReplLogger replLogger,
+                    TableContext tableContext, TaskTracker limiter)
+       throws SemanticException, IOException {
+     this.event = event;
+     this.context = context;
+     this.replLogger = replLogger;
+     this.tableContext = tableContext;
+     this.tracker = new TaskTracker(limiter);
+   }
+ 
+   private void createTableReplLogTask(String tableName, TableType tableType) 
throws SemanticException {
+     ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger,tableName, 
tableType);
+     Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork, 
context.hiveConf);
+     ReplLoadTask.dependency(tracker.tasks(), replLogTask);
+ 
+     if (tracker.tasks().isEmpty()) {
+       tracker.addTask(replLogTask);
+     } else {
+       ReplLoadTask.dependency(tracker.tasks(), replLogTask);
+ 
+       List<Task<? extends Serializable>> visited = new ArrayList<>();
+       tracker.updateTaskCount(replLogTask, visited);
+     }
+   }
+ 
+   public TaskTracker tasks() throws SemanticException {
+     // Path being passed to us is a table dump location. We go ahead and load 
it in as needed.
+     // If tblName is null, then we default to the table name specified in 
_metadata, which is good.
+     // or are both specified, in which case, that's what we are intended to 
create the new table as.
+     try {
+       if (event.shouldNotReplicate()) {
+         return tracker;
+       }
+       String dbName = tableContext.dbNameToLoadIn; //this can never be null 
or empty;
+       // Create table associated with the import
+       // Executed if relevant, and used to contain all the other details 
about the table if not.
+       ImportTableDesc tableDesc = 
tableContext.overrideProperties(event.tableDesc(dbName));
+       Table table = ImportSemanticAnalyzer.tableIfExists(tableDesc, 
context.hiveDb);
+       ReplicationSpec replicationSpec = event.replicationSpec();
+ 
+       // Normally, on import, trying to create a table or a partition in a db 
that does not yet exist
+       // is a error condition. However, in the case of a REPL LOAD, it is 
possible that we are trying
+       // to create tasks to create a table inside a db that as-of-now does 
not exist, but there is
+       // a precursor Task waiting that will create it before this is 
encountered. Thus, we instantiate
+       // defaults and do not error out in that case.
+       // the above will change now since we are going to split replication 
load in multiple execution
+       // tasks and hence we could have created the database earlier in which 
case the waitOnPrecursor will
+       // be false and hence if db Not found we should error out.
+       Database parentDb = 
context.hiveDb.getDatabase(tableDesc.getDatabaseName());
+       if (parentDb == null) {
+         if (!tableContext.waitOnPrecursor()) {
+           throw new SemanticException(
+               
ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tableDesc.getDatabaseName()));
+         }
+       }
+ 
+       if (table == null) {
+         // If table doesn't exist, allow creating a new one only if the 
database state is older than the update.
+         if ((parentDb != null) && (!replicationSpec
+             .allowReplacementInto(parentDb.getParameters()))) {
+           // If the target table exists and is newer or same as current 
update based on repl.last.id, then just noop it.
+           return tracker;
+         }
+       } else {
+         if (!replicationSpec.allowReplacementInto(table.getParameters())) {
+           // If the target table exists and is newer or same as current 
update based on repl.last.id, then just noop it.
+           return tracker;
+         }
+       }
+ 
+       if (tableDesc.getLocation() == null) {
+         tableDesc.setLocation(location(tableDesc, parentDb));
+       }
+ 
+ 
+   /* Note: In the following section, Metadata-only import handling logic is
+      interleaved with regular repl-import logic. The rule of thumb being
+      followed here is that MD-only imports are essentially ALTERs. They do
+      not load data, and should not be "creating" any metadata - they should
+      be replacing instead. The only place it makes sense for a MD-only import
+      to create is in the case of a table that's been dropped and recreated,
+      or in the case of an unpartitioned table. In all other cases, it should
+      behave like a noop or a pure MD alter.
+   */
+       if (table == null) {
+         newTableTasks(tableDesc);
+       } else {
+         existingTableTasks(tableDesc, table, replicationSpec);
+       }
+ 
+       if (!isPartitioned(tableDesc)) {
+         createTableReplLogTask(tableDesc.getTableName(), 
tableDesc.tableType());
+       }
+       return tracker;
+     } catch (Exception e) {
+       throw new SemanticException(e);
+     }
+   }
+ 
+   private void existingTableTasks(ImportTableDesc tblDesc, Table table,
+       ReplicationSpec replicationSpec) {
+     if (!table.isPartitioned()) {
+ 
+       LOG.debug("table non-partitioned");
+       if (!replicationSpec.allowReplacementInto(table.getParameters())) {
+         return; // silently return, table is newer than our replacement.
+       }
+ 
+       Task<? extends Serializable> alterTableTask = alterTableTask(tblDesc, 
replicationSpec);
+       if (replicationSpec.isMetadataOnly()) {
+         tracker.addTask(alterTableTask);
+       } else {
+         Task<?> loadTableTask =
+             loadTableTask(table, replicationSpec, event.metadataPath(), 
event.metadataPath());
+         alterTableTask.addDependentTask(loadTableTask);
+         tracker.addTask(alterTableTask);
+       }
+     }
+   }
+ 
+   private void newTableTasks(ImportTableDesc tblDesc) throws 
SemanticException {
+     Table table;
+     table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName());
+     // Either we're dropping and re-creating, or the table didn't exist, and 
we're creating.
+     Task<?> createTableTask =
+         tblDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), 
context.hiveConf);
+     if (event.replicationSpec().isMetadataOnly()) {
+       tracker.addTask(createTableTask);
+       return;
+     }
+     if (!isPartitioned(tblDesc)) {
+       LOG.debug("adding dependent CopyWork/MoveWork for table");
+       Task<?> loadTableTask =
+           loadTableTask(table, event.replicationSpec(), new 
Path(tblDesc.getLocation()),
+               event.metadataPath());
+       createTableTask.addDependentTask(loadTableTask);
+     }
+     tracker.addTask(createTableTask);
+   }
+ 
+   private String location(ImportTableDesc tblDesc, Database parentDb)
+       throws MetaException, SemanticException {
+     if (!tableContext.waitOnPrecursor()) {
+       return context.warehouse.getDefaultTablePath(parentDb, 
tblDesc.getTableName()).toString();
+     } else {
+       Path tablePath = new Path(
+           context.warehouse.getDefaultDatabasePath(tblDesc.getDatabaseName()),
+           MetaStoreUtils.encodeTableName(tblDesc.getTableName().toLowerCase())
+       );
+       return context.warehouse.getDnsPath(tablePath).toString();
+     }
+   }
+ 
+   private Task<?> loadTableTask(Table table, ReplicationSpec replicationSpec, 
Path tgtPath,
+       Path fromURI) {
+     Path dataPath = new Path(fromURI, EximUtil.DATA_PATH_NAME);
+     Path tmpPath = context.utils.getExternalTmpPath(tgtPath);
+     Task<?> copyTask =
+         ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, 
context.hiveConf);
+ 
+     LoadTableDesc loadTableWork = new LoadTableDesc(
 -        tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), 
replicationSpec.isReplace()
++        tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), 
replicationSpec.isReplace(),
++        SessionState.get().getTxnMgr().getCurrentTxnId()
+     );
+     MoveWork moveWork =
+         new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, 
false);
+     Task<?> loadTableTask = TaskFactory.get(moveWork, context.hiveConf);
+     copyTask.addDependentTask(loadTableTask);
+     return copyTask;
+   }
+ 
+   private Task<? extends Serializable> alterTableTask(ImportTableDesc 
tableDesc,
+       ReplicationSpec replicationSpec) {
+     tableDesc.setReplaceMode(true);
+     if ((replicationSpec != null) && 
(replicationSpec.isInReplicationScope())) {
+       tableDesc.setReplicationSpec(replicationSpec);
+     }
+     return tableDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), 
context.hiveConf);
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
----------------------------------------------------------------------
diff --cc 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
index cfa4e2c,90d1372..b5733ec
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
@@@ -145,12 -162,18 +162,18 @@@ public class VectorizedRowBatchCtx 
  
    public static void getPartitionValues(VectorizedRowBatchCtx vrbCtx, 
Configuration hiveConf,
        FileSplit split, Object[] partitionValues) throws IOException {
+     // TODO: this is invalid for SMB. Keep this for now for legacy reasons. 
See the other overload.
+     MapWork mapWork = Utilities.getMapWork(hiveConf);
+     getPartitionValues(vrbCtx, mapWork, split, partitionValues);
+   }
  
-     Map<Path, PartitionDesc> pathToPartitionInfo = Utilities
-         .getMapWork(hiveConf).getPathToPartitionInfo();
+   public static void getPartitionValues(VectorizedRowBatchCtx vrbCtx,
+       MapWork mapWork, FileSplit split, Object[] partitionValues)
+       throws IOException {
+     Map<Path, PartitionDesc> pathToPartitionInfo = 
mapWork.getPathToPartitionInfo();
  
      PartitionDesc partDesc = HiveFileFormatUtils
 -        .getPartitionDescFromPathRecursively(pathToPartitionInfo,
 +        .getFromPathRecursively(pathToPartitionInfo,
              split.getPath(), IOPrepareCache.get().getPartitionDescMap());
  
      getPartitionValues(vrbCtx, partDesc, partitionValues);

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index f325c0e,feacdd8..9c9a2d3
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@@ -36,9 -37,9 +37,10 @@@ import org.apache.hadoop.hive.conf.Hive
  import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
  import org.apache.hadoop.hive.metastore.api.DataOperationType;
  import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 +import org.apache.hadoop.hive.metastore.MetaStoreUtils;
  import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
  import org.apache.hadoop.hive.ql.ErrorMsg;
+ import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
  import org.apache.hadoop.hive.ql.metadata.Table;
  import org.apache.hadoop.hive.shims.HadoopShims;
  import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
@@@ -345,11 -384,7 +385,10 @@@ public class AcidUtils 
      public static final String SPLIT_UPDATE_STRING = "split_update";
      public static final int HASH_BASED_MERGE_BIT = 0x02;
      public static final String HASH_BASED_MERGE_STRING = "hash_merge";
 +    public static final int INSERT_ONLY_BIT = 0x04;
 +    public static final String INSERT_ONLY_STRING = "insert_only";
      public static final String DEFAULT_VALUE_STRING = 
TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY;
-     public static final String LEGACY_VALUE_STRING = 
TransactionalValidationListener.LEGACY_TRANSACTIONAL_PROPERTY;
 +    public static final String INSERTONLY_VALUE_STRING = 
TransactionalValidationListener.INSERTONLY_TRANSACTIONAL_PROPERTY;
  
      private AcidOperationalProperties() {
      }
@@@ -401,12 -414,6 +430,9 @@@
        if (propertiesStr.equalsIgnoreCase(DEFAULT_VALUE_STRING)) {
          return AcidOperationalProperties.getDefault();
        }
-       if (propertiesStr.equalsIgnoreCase(LEGACY_VALUE_STRING)) {
-         return AcidOperationalProperties.getLegacy();
-       }
 +      if (propertiesStr.equalsIgnoreCase(INSERTONLY_VALUE_STRING)) {
 +        return AcidOperationalProperties.getInsertOnly();
 +      }
        AcidOperationalProperties obj = new AcidOperationalProperties();
        String[] options = propertiesStr.split("\\|");
        for (String option : options) {

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
----------------------------------------------------------------------
diff --cc 
ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
index a70fde6,38aaeed..6178de2
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
@@@ -14,16 -14,12 +14,20 @@@
  package org.apache.hadoop.hive.ql.io.parquet;
  
  import java.io.IOException;
 +import java.util.Map;
  
+ import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hive.common.io.DataCache;
+ import org.apache.hadoop.hive.common.io.FileMetadataCache;
  import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+ import org.apache.hadoop.hive.ql.io.LlapCacheOnlyInputFormatInterface;
 +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils;
 +import org.apache.hadoop.hive.ql.plan.MapWork;
 +import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 +import org.apache.hadoop.mapred.FileSplit;
 +import org.apache.hadoop.mapred.JobConf;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  import org.apache.hadoop.hive.ql.exec.Utilities;

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 01e8a48,5dec791..82804b2
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@@ -423,11 -423,19 +423,19 @@@ public final class DbTxnManager extend
            compBuilder.setExclusive();
            compBuilder.setOperationType(DataOperationType.NO_TXN);
            break;
- 
+         case INSERT_OVERWRITE:
+           t = getTable(output);
+           if (AcidUtils.isAcidTable(t)) {
+             compBuilder.setSemiShared();
+             compBuilder.setOperationType(DataOperationType.UPDATE);
+           } else {
+             compBuilder.setExclusive();
+             compBuilder.setOperationType(DataOperationType.NO_TXN);
+           }
+           break;
          case INSERT:
            assert t != null;
 -          if(AcidUtils.isAcidTable(t)) {
 +          if(AcidUtils.isFullAcidTable(t)) {
              compBuilder.setShared();
            }
            else {

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index ae451e2,d661f10..a0b735c
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@@ -65,9 -62,9 +65,10 @@@ import org.apache.hadoop.fs.FileStatus
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.fs.PathFilter;
+ import org.apache.hadoop.hive.common.BlobStorageUtils;
  import org.apache.hadoop.hive.common.FileUtils;
  import org.apache.hadoop.hive.common.HiveStatsUtils;
 +import org.apache.hadoop.hive.common.JavaUtils;
  import org.apache.hadoop.hive.common.ObjectPair;
  import org.apache.hadoop.hive.common.StatsSetupConst;
  import 
org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate;
@@@ -3338,8 -3250,9 +3389,9 @@@ private void constructOneLBLocationMap(
      // (1) Do not delete the dest dir before doing the move operation.
      // (2) It is assumed that subdir and dir are in same encryption zone.
      // (3) Move individual files from scr dir to dest dir.
 -    boolean destIsSubDir = isSubDir(srcf, destf, srcFs, destFs, isSrcLocal);
 +    boolean srcIsSubDirOfDest = isSubDir(srcf, destf, srcFs, destFs, 
isSrcLocal),
 +        destIsSubDirOfSrc = isSubDir(destf, srcf, destFs, srcFs, false);
+     final String msg = "Unable to move source " + srcf + " to destination " + 
destf;
 -
      try {
        if (replace) {
          try{

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index 7fd8f04,a054abb..021507f
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@@ -1507,9 -1517,14 +1530,15 @@@ public class DDLSemanticAnalyzer extend
      }
  
      Table tab = getTable(tableName, true);
+     // cascade only occurs with partitioned table
+     if (isCascade && !tab.isPartitioned()) {
+       throw new SemanticException(
+           ErrorMsg.ALTER_TABLE_NON_PARTITIONED_TABLE_CASCADE_NOT_SUPPORTED);
+     }
+ 
      // Determine the lock type to acquire
 -    WriteEntity.WriteType writeType = 
WriteEntity.determineAlterTableWriteType(op);
 +    WriteEntity.WriteType writeType = doForceExclusive
 +        ? WriteType.DDL_EXCLUSIVE : 
WriteEntity.determineAlterTableWriteType(op);
  
      if (!alterPartitions) {
        inputs.add(new ReadEntity(tab));

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index 2401036,40c34bf..4109de4
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@@ -140,15 -141,15 +141,16 @@@ public class EximUtil 
     * Initialize the URI where the exported data collection is
     * to created for export, or is present for import
     */
-   static URI getValidatedURI(HiveConf conf, String dcPath) throws 
SemanticException {
+   public static URI getValidatedURI(HiveConf conf, String dcPath) throws 
SemanticException {
      try {
 -      boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE);
 +      boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE)
 +          || conf.getBoolVar(HiveConf.ConfVars.HIVEEXIMTESTMODE);
        URI uri = new Path(dcPath).toUri();
-       String scheme = uri.getScheme();
+       FileSystem fs = FileSystem.get(uri, conf);
+       // Get scheme from FileSystem
+       String scheme = fs.getScheme();
        String authority = uri.getAuthority();
        String path = uri.getPath();
-       FileSystem fs = FileSystem.get(uri, conf);
  
        LOG.info("Path before norm :" + path);
        // generate absolute path relative to home directory

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc 
ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
index e534272,b8c6ea9..54ee7ae
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
@@@ -18,42 -18,12 +18,46 @@@
  
  package org.apache.hadoop.hive.ql.parse;
  
 +
 +import org.apache.hadoop.hive.ql.metadata.HiveException;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.io.Serializable;
 +import java.net.URI;
 +import java.util.HashSet;
 +import java.util.List;
 +
 +import org.antlr.runtime.tree.Tree;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hive.common.FileUtils;
 +import org.apache.hadoop.hive.common.ValidReadTxnList;
 +import org.apache.hadoop.hive.common.ValidTxnList;
 +import org.apache.hadoop.hive.conf.HiveConf;
 +import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 +import org.apache.hadoop.hive.ql.Context;
 +import org.apache.hadoop.hive.ql.ErrorMsg;
 +import org.apache.hadoop.hive.ql.QueryState;
 +import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
 +import org.apache.hadoop.hive.ql.exec.Task;
 +import org.apache.hadoop.hive.ql.exec.TaskFactory;
 +import org.apache.hadoop.hive.ql.exec.Utilities;
 +import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 +import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 +import org.apache.hadoop.hive.ql.metadata.Hive;
+ import org.antlr.runtime.tree.Tree;
+ import org.apache.hadoop.hive.ql.ErrorMsg;
+ import org.apache.hadoop.hive.ql.QueryState;
  import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 +import org.apache.hadoop.hive.ql.metadata.Partition;
 +import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
  import org.apache.hadoop.hive.ql.metadata.Table;
 +import org.apache.hadoop.hive.ql.plan.CopyWork;
 +import org.slf4j.Logger;
+ import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport;
  
  /**
   * ExportSemanticAnalyzer.

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc 
ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 5ac15af,7f3460f..387582b
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@@ -62,12 -59,10 +62,12 @@@ import org.apache.hadoop.hive.ql.metada
  import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
  import org.apache.hadoop.hive.ql.metadata.Table;
  import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
+ import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
  import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 +import org.apache.hadoop.hive.ql.plan.CopyWork;
 +import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
  import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
  import org.apache.hadoop.hive.ql.plan.DDLWork;
- import org.apache.hadoop.hive.ql.plan.DropTableDesc;
  import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
  import org.apache.hadoop.hive.ql.plan.MoveWork;
  import org.apache.hadoop.hive.ql.session.SessionState;
@@@ -223,9 -228,8 +234,9 @@@ public class ImportSemanticAnalyzer ext
      } catch (Exception e) {
        throw new HiveException(e);
      }
 +    boolean isSourceMm = 
MetaStoreUtils.isInsertOnlyTable(tblDesc.getTblProps());
  
-     if ((replicationSpec!= null) && replicationSpec.isInReplicationScope()){
+     if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){
        tblDesc.setReplicationSpec(replicationSpec);
      }
  
@@@ -318,8 -306,8 +324,8 @@@
      } else {
        createReplImportTasks(
            tblDesc, partitionDescs,
-           isPartSpecSet, replicationSpec, waitOnPrecursor, table,
-           fromURI, fs, wh, x, txnId, stmtId, isSourceMm);
+           replicationSpec, waitOnPrecursor, table,
 -          fromURI, fs, wh, x, updatedMetadata);
++          fromURI, fs, wh, x, txnId, stmtId, isSourceMm, updatedMetadata);
      }
      return tableExists;
    }
@@@ -800,8 -734,8 +799,8 @@@
          // ensure if destination is not empty only for regular import
          Path tgtPath = new Path(table.getDataLocation().toString());
          FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf());
-         checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x);
+         checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x.getLOG());
 -        loadTable(fromURI, table, false, tgtPath, replicationSpec,x);
 +        loadTable(fromURI, table, false, tgtPath, replicationSpec, x, txnId, 
stmtId, isSourceMm);
        }
        // Set this to read because we can't overwrite any existing partitions
        x.getOutputs().add(new WriteEntity(table, 
WriteEntity.WriteType.DDL_NO_LOCK));
@@@ -838,14 -769,8 +837,14 @@@
              tablePath = wh.getDefaultTablePath(parentDb, 
tblDesc.getTableName());
            }
            FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf());
-           checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x);
+           checkTargetLocationEmpty(tgtFs, tablePath, 
replicationSpec,x.getLOG());
 -          t.addDependentTask(loadTable(fromURI, table, false, tablePath, 
replicationSpec, x));
 +          if (isSourceMm) { // since target table doesn't exist, it should 
inherit soruce table's properties
 +            Map<String, String> tblproperties = table.getParameters();
 +            tblproperties.put("transactional", "true");
 +            tblproperties.put("transactional_properties", "insert_only");
 +            table.setParameters(tblproperties);
 +          }
 +          t.addDependentTask(loadTable(fromURI, table, false, tablePath, 
replicationSpec, x, txnId, stmtId, isSourceMm));
          }
        }
        x.getTasks().add(t);
@@@ -866,29 -783,14 +865,15 @@@
    private static void createReplImportTasks(
        ImportTableDesc tblDesc,
        List<AddPartitionDesc> partitionDescs,
-       boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean 
waitOnPrecursor,
+       ReplicationSpec replicationSpec, boolean waitOnPrecursor,
        Table table, URI fromURI, FileSystem fs, Warehouse wh,
-       EximUtil.SemanticAnalyzerWrapperContext x, Long txnId, int stmtId, 
boolean isSourceMm)
 -      EximUtil.SemanticAnalyzerWrapperContext x,
++      EximUtil.SemanticAnalyzerWrapperContext x, Long txnId, int stmtId, 
boolean isSourceMm,
+       UpdatedMetaDataTracker updatedMetadata)
        throws HiveException, URISyntaxException, IOException, MetaException {
  
 +    Task<?> dr = null;
      WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK;
  
-     if ((table != null) && (isPartitioned(tblDesc) != table.isPartitioned())){
-       // If destination table exists, but is partitioned, and we think we're 
writing to an unpartitioned
-       // or if destination table exists, but is unpartitioned and we think 
we're writing to a partitioned
-       // table, then this can only happen because there are drops in the 
queue that are yet to be processed.
-       // So, we check the repl.last.id of the destination, and if it's newer, 
we no-op. If it's older, we
-       // drop and re-create.
-       if (replicationSpec.allowReplacementInto(table)){
-         dr = dropTableTask(table, x);
-         lockType = WriteEntity.WriteType.DDL_EXCLUSIVE;
-         table = null; // null it out so we go into the table re-create flow.
-       } else {
-         return; // noop out of here.
-       }
-     }
- 
      // Normally, on import, trying to create a table or a partition in a db 
that does not yet exist
      // is a error condition. However, in the case of a REPL LOAD, it is 
possible that we are trying
      // to create tasks to create a table inside a db that as-of-now does not 
exist, but there is
@@@ -942,21 -863,18 +949,18 @@@
            for (AddPartitionDesc addPartitionDesc : partitionDescs) {
              addPartitionDesc.setReplicationSpec(replicationSpec);
              t.addDependentTask(
 -                addSinglePartition(fromURI, fs, tblDesc, table, wh, 
addPartitionDesc, replicationSpec, x));
 +                addSinglePartition(fromURI, fs, tblDesc, table, wh, 
addPartitionDesc, replicationSpec, x, txnId, stmtId, isSourceMm, ict));
+             if (updatedMetadata != null) {
+               
updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
+             }
            }
          } else {
            x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
 -          t.addDependentTask(loadTable(fromURI, table, true, new 
Path(tblDesc.getLocation()),replicationSpec, x));
 +          t.addDependentTask(loadTable(fromURI, table, true, new 
Path(tblDesc.getLocation()), replicationSpec, x, txnId, stmtId, isSourceMm));
          }
        }
-       if (dr == null){
-         // Simply create
-         x.getTasks().add(t);
-       } else {
-         // Drop and recreate
-         dr.addDependentTask(t);
-         x.getTasks().add(dr);
-       }
+       // Simply create
+       x.getTasks().add(t);
      } else {
        // Table existed, and is okay to replicate into, not dropping and 
re-creating.
        if (table.isPartitioned()) {
@@@ -971,15 -887,18 +975,18 @@@
            if ((ptn = x.getHive().getPartition(table, partSpec, false)) == 
null) {
              if (!replicationSpec.isMetadataOnly()){
                x.getTasks().add(addSinglePartition(
 -                  fromURI, fs, tblDesc, table, wh, addPartitionDesc, 
replicationSpec, x));
 +                  fromURI, fs, tblDesc, table, wh, addPartitionDesc, 
replicationSpec, x, txnId, stmtId, isSourceMm, ict));
+               if (updatedMetadata != null) {
+                 
updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
+               }
              }
            } else {
              // If replicating, then the partition already existing means we 
need to replace, maybe, if
              // the destination ptn's repl.last.id is older than the 
replacement's.
-             if (replicationSpec.allowReplacementInto(ptn)){
+             if (replicationSpec.allowReplacementInto(ptn.getParameters())){
                if (!replicationSpec.isMetadataOnly()){
                  x.getTasks().add(addSinglePartition(
 -                    fromURI, fs, tblDesc, table, wh, addPartitionDesc, 
replicationSpec, x));
 +                    fromURI, fs, tblDesc, table, wh, addPartitionDesc, 
replicationSpec, x, txnId, stmtId, isSourceMm, ict));
                } else {
                  x.getTasks().add(alterSinglePartition(
                      fromURI, fs, tblDesc, table, wh, addPartitionDesc, 
replicationSpec, ptn, x));
@@@ -1002,13 -921,9 +1009,10 @@@
          }
        } else {
          x.getLOG().debug("table non-partitioned");
-         if (!replicationSpec.allowReplacementInto(table)){
-           return; // silently return, table is newer than our replacement.
-         }
          if (!replicationSpec.isMetadataOnly()) {
            // repl-imports are replace-into unless the event is insert-into
 -          loadTable(fromURI, table, replicationSpec.isReplace(), new 
Path(fromURI), replicationSpec, x);
 +          loadTable(fromURI, table, replicationSpec.isReplace(), new 
Path(fromURI),
 +            replicationSpec, x, txnId, stmtId, isSourceMm);
          } else {
            x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec));
          }

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
index 9b39cd0,fa79700..5e70863
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
@@@ -227,6 -228,9 +229,9 @@@ public class LoadSemanticAnalyzer exten
            + " and use 'insert... select' to allow Hive to enforce bucketing. 
" + error);
      }
  
 -    if(AcidUtils.isAcidTable(ts.tableHandle)) {
++    if(AcidUtils.isAcidTable(ts.tableHandle) && 
!MetaStoreUtils.isInsertOnlyTable(ts.tableHandle.getParameters())) {
+       throw new SemanticException(ErrorMsg.LOAD_DATA_ON_ACID_TABLE, 
ts.tableHandle.getCompleteName());
+     }
      // make sure the arguments make sense
      List<FileStatus> files = applyConstraintsAndGetFiles(fromURI, fromTree, 
isLocal);
  

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 19882eb,6f379da..ab3cbe1
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@@ -6851,17 -6896,13 +6855,21 @@@ public class SemanticAnalyzer extends B
          AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
          if (destTableIsAcid) {
            acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
-           checkAcidConstraints(qb, table_desc, dest_tab, acidOp);
+           checkAcidConstraints(qb, table_desc, dest_tab);
          }
 -        ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp);
 +        if (MetaStoreUtils.isInsertOnlyTable(table_desc.getProperties())) {
 +          acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
 +        }
 +        if (isMmTable) {
 +          txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
 +        }
 +        boolean isReplace = !qb.getParseInfo().isInsertIntoTable(
 +            dest_tab.getDbName(), dest_tab.getTableName());
 +        ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, 
isReplace, txnId);
+         // For Acid table, Insert Overwrite shouldn't replace the table 
content. We keep the old
+         // deltas and base and leave them up to the cleaner to clean up
+         
ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
+             dest_tab.getTableName()) && !destTableIsAcid);
          ltd.setLbCtx(lbCtx);
          loadTableWork.add(ltd);
        } else {
@@@ -6915,17 -7011,13 +6923,19 @@@
        AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
        if (destTableIsAcid) {
          acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
-         checkAcidConstraints(qb, table_desc, dest_tab, acidOp);
+         checkAcidConstraints(qb, table_desc, dest_tab);
        }
 -      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), 
acidOp);
 +      if 
(MetaStoreUtils.isInsertOnlyTable(dest_part.getTable().getParameters())) {
 +        acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
 +      }
 +      if (isMmTable) {
 +        txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
 +      }
 +      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), 
acidOp, txnId);
+       // For Acid table, Insert Overwrite shouldn't replace the table 
content. We keep the old
+       // deltas and base and leave them up to the cleaner to clean up
        
ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
-           dest_tab.getTableName()));
+           dest_tab.getTableName()) && !destTableIsAcid);
        ltd.setLbCtx(lbCtx);
  
        loadTableWork.add(ltd);
@@@ -7091,122 -7226,18 +7101,122 @@@
        genPartnCols(dest, input, qb, table_desc, dest_tab, rsCtx);
      }
  
 -    FileSinkDesc fileSinkDesc = new FileSinkDesc(
 -      queryTmpdir,
 -      table_desc,
 -      conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT),
 -      currentTableId,
 -      rsCtx.isMultiFileSpray(),
 -      canBeMerged,
 -      rsCtx.getNumFiles(),
 -      rsCtx.getTotalFiles(),
 -      rsCtx.getPartnCols(),
 -      dpCtx,
 -      dest_path);
 +    FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, table_desc, 
dest_part,
 +        dest_path, currentTableId, destTableIsAcid, destTableIsTemporary,
 +        destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS,
-         canBeMerged, txnId, isMmCtas);
++        canBeMerged, dest_tab, txnId, isMmCtas);
 +    if (isMmCtas) {
 +      // Add FSD so that the LoadTask compilation could fix up its path to 
avoid the move.
 +      tableDesc.setWriter(fileSinkDesc);
 +    }
 +
 +    if (SessionState.get().isHiveServerQuery() &&
 +      null != table_desc &&
 +      
table_desc.getSerdeClassName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName())
 &&
 +      
HiveConf.getBoolVar(conf,HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS))
 {
 +        fileSinkDesc.setIsUsingThriftJDBCBinarySerDe(true);
 +    } else {
 +        fileSinkDesc.setIsUsingThriftJDBCBinarySerDe(false);
 +    }
 +
 +    Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
 +        fileSinkDesc, fsRS, input), inputRR);
 +
 +    handleLineage(ltd, output);
 +
 +    if (LOG.isDebugEnabled()) {
 +      LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: "
 +          + dest_path + " row schema: " + inputRR.toString());
 +    }
 +
 +    FileSinkOperator fso = (FileSinkOperator) output;
 +    fso.getConf().setTable(dest_tab);
 +    // the following code is used to collect column stats when
 +    // hive.stats.autogather=true
 +    // and it is an insert overwrite or insert into table
 +    if (dest_tab != null && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)
 +        && conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER)
 +        && ColumnStatsAutoGatherContext.canRunAutogatherStats(fso)) {
 +      if (dest_type.intValue() == QBMetaData.DEST_TABLE) {
 +        genAutoColumnStatsGatheringPipeline(qb, table_desc, partSpec, input, 
qb.getParseInfo()
 +            .isInsertIntoTable(dest_tab.getDbName(), 
dest_tab.getTableName()));
 +      } else if (dest_type.intValue() == QBMetaData.DEST_PARTITION) {
 +        genAutoColumnStatsGatheringPipeline(qb, table_desc, 
dest_part.getSpec(), input, qb
 +            .getParseInfo().isInsertIntoTable(dest_tab.getDbName(), 
dest_tab.getTableName()));
 +
 +      }
 +    }
 +    return output;
 +  }
 +
 +  private ColsAndTypes deriveFileSinkColTypes(
 +      RowResolver inputRR, List<FieldSchema> field_schemas) throws 
SemanticException {
 +    ColsAndTypes result = new ColsAndTypes("", "");
 +    ArrayList<ColumnInfo> colInfos = inputRR.getColumnInfos();
 +    boolean first = true;
 +    for (ColumnInfo colInfo : colInfos) {
 +      String[] nm = inputRR.reverseLookup(colInfo.getInternalName());
 +
 +      if (nm[1] != null) { // non-null column alias
 +        colInfo.setAlias(nm[1]);
 +      }
 +
 +      String colName = colInfo.getInternalName();  //default column name
 +      if (field_schemas != null) {
 +        FieldSchema col = new FieldSchema();
 +        if (!("".equals(nm[0])) && nm[1] != null) {
 +          colName = unescapeIdentifier(colInfo.getAlias()).toLowerCase(); // 
remove ``
 +        }
 +        colName = fixCtasColumnName(colName);
 +        col.setName(colName);
 +        String typeName = colInfo.getType().getTypeName();
 +        // CTAS should NOT create a VOID type
 +        if (typeName.equals(serdeConstants.VOID_TYPE_NAME)) {
 +            throw new 
SemanticException(ErrorMsg.CTAS_CREATES_VOID_TYPE.getMsg(colName));
 +        }
 +        col.setType(typeName);
 +        field_schemas.add(col);
 +      }
 +
 +      if (!first) {
 +        result.cols = result.cols.concat(",");
 +        result.colTypes = result.colTypes.concat(":");
 +      }
 +
 +      first = false;
 +      result.cols = result.cols.concat(colName);
 +
 +      // Replace VOID type with string when the output is a temp table or
 +      // local files.
 +      // A VOID type can be generated under the query:
 +      //
 +      // select NULL from tt;
 +      // or
 +      // insert overwrite local directory "abc" select NULL from tt;
 +      //
 +      // where there is no column type to which the NULL value should be
 +      // converted.
 +      //
 +      String tName = colInfo.getType().getTypeName();
 +      if (tName.equals(serdeConstants.VOID_TYPE_NAME)) {
 +        result.colTypes = 
result.colTypes.concat(serdeConstants.STRING_TYPE_NAME);
 +      } else {
 +        result.colTypes = result.colTypes.concat(tName);
 +      }
 +    }
 +    return result;
 +  }
 +
 +  private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc,
 +      Partition dest_part, Path dest_path, int currentTableId,
 +      boolean destTableIsAcid, boolean destTableIsTemporary,
 +      boolean destTableIsMaterialization, Path queryTmpdir,
 +      SortBucketRSCtx rsCtx, DynamicPartitionCtx dpCtx, ListBucketingCtx 
lbCtx,
-       RowSchema fsRS, boolean canBeMerged, Long mmWriteId, boolean isMmCtas) 
throws SemanticException {
++      RowSchema fsRS, boolean canBeMerged, Table dest_tab, Long mmWriteId, 
boolean isMmCtas) throws SemanticException {
 +    FileSinkDesc fileSinkDesc = new FileSinkDesc(queryTmpdir, table_desc,
 +        conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId, 
rsCtx.isMultiFileSpray(),
 +        canBeMerged, rsCtx.getNumFiles(), rsCtx.getTotalFiles(), 
rsCtx.getPartnCols(), dpCtx,
 +        dest_path, mmWriteId, isMmCtas);
  
      boolean isHiveServerQuery = SessionState.get().isHiveServerQuery();
      fileSinkDesc.setHiveServerQuery(isHiveServerQuery);
@@@ -7448,14 -7395,12 +7456,9 @@@
      */
      conf.set(AcidUtils.CONF_ACID_KEY, "true");
  
-     if (!Operation.NOT_ACID.equals(acidOp)) {
-       if (table.getNumBuckets() < 1) {
-         throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TABLE, 
table.getTableName());
-       }
-       if (table.getSortCols() != null && table.getSortCols().size() > 0) {
-         throw new SemanticException(ErrorMsg.ACID_NO_SORTED_BUCKETS, 
table.getTableName());
-       }
+     if (table.getSortCols() != null && table.getSortCols().size() > 0) {
+       throw new SemanticException(ErrorMsg.ACID_NO_SORTED_BUCKETS, 
table.getTableName());
      }
 -
 -
 -
    }
  
    /**

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index d0a82af,4f8ba6f..ffc2daf
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@@ -355,6 -396,52 +396,52 @@@ public class GenSparkUtils 
    }
  
    /**
+    * Create and add any dependent move tasks.
+    *
+    * This is forked from {@link GenMapRedUtils}. The difference is that it 
doesn't check
+    * 'isLinkedFileSink' and does not set parent dir for the linked file sinks.
+    */
+   public static Path createMoveTask(Task<? extends Serializable> currTask, 
boolean chDir,
+       FileSinkOperator fsOp, ParseContext parseCtx, List<Task<MoveWork>> 
mvTasks,
+       HiveConf hconf, DependencyCollectionTask dependencyTask) {
+ 
+     Path dest = null;
++    FileSinkDesc fileSinkDesc = fsOp.getConf();
+ 
+     if (chDir) {
+       dest = fsOp.getConf().getFinalDirName();
+ 
+       // generate the temporary file
+       // it must be on the same file system as the current destination
+       Context baseCtx = parseCtx.getContext();
+ 
+       Path tmpDir = baseCtx.getExternalTmpPath(dest);
+ 
 -      FileSinkDesc fileSinkDesc = fsOp.getConf();
+       // Change all the linked file sink descriptors
+       if (fileSinkDesc.getLinkedFileSinkDesc() != null) {
+         for (FileSinkDesc fsConf : fileSinkDesc.getLinkedFileSinkDesc()) {
+           fsConf.setDirName(tmpDir);
+         }
+       } else {
+         fileSinkDesc.setDirName(tmpDir);
+       }
+     }
+ 
+     Task<MoveWork> mvTask = null;
+ 
+     if (!chDir) {
 -      mvTask = GenMapRedUtils.findMoveTask(mvTasks, fsOp);
++      mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, 
fileSinkDesc.getFinalDirName(), false);
+     }
+ 
+     // Set the move task to be dependent on the current task
+     if (mvTask != null) {
+       GenMapRedUtils.addDependentMoveTasks(mvTask, hconf, currTask, 
dependencyTask);
+     }
+ 
+     return dest;
+   }
+ 
+   /**
     * Populate partition pruning information from the pruning sink operator to 
the
     * target MapWork (the MapWork for the big table side). The information 
include the source table
     * name, column name, and partition key expression. It also set up the 
temporary path used to

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index 955e0e5,a3df166..4732f0a
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@@ -20,9 -20,9 +20,10 @@@ package org.apache.hadoop.hive.ql.plan
  
  import java.util.ArrayList;
  import java.util.List;
+ import java.util.Objects;
  
  import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hive.metastore.MetaStoreUtils;
  import org.apache.hadoop.hive.ql.io.AcidUtils;
  import org.apache.hadoop.hive.ql.metadata.Table;
  import org.apache.hadoop.hive.ql.plan.Explain.Level;

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --cc ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 3600a6b,0f129fc..5a8ac31
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@@ -487,79 -439,7 +439,6 @@@ public class TestTxnCommands extends Te
      }
    }
  
-   /**
-    * takes raw data and turns it into a string as if from Driver.getResults()
-    * sorts rows in dictionary order
-    */
-   private List<String> stringifyValues(int[][] rowsIn) {
-     assert rowsIn.length > 0;
-     int[][] rows = rowsIn.clone();
-     Arrays.sort(rows, new RowComp());
-     List<String> rs = new ArrayList<String>();
-     for(int[] row : rows) {
-       assert row.length > 0;
-       StringBuilder sb = new StringBuilder();
-       for(int value : row) {
-         sb.append(value).append("\t");
-       }
-       sb.setLength(sb.length() - 1);
-       rs.add(sb.toString());
-     }
-     return rs;
-   }
-   private static final class RowComp implements Comparator<int[]> {
-     @Override
-     public int compare(int[] row1, int[] row2) {
-       assert row1 != null && row2 != null && row1.length == row2.length;
-       for(int i = 0; i < row1.length; i++) {
-         int comp = Integer.compare(row1[i], row2[i]);
-         if(comp != 0) {
-           return comp;
-         }
-       }
-       return 0;
-     }
-   }
-   private String makeValuesClause(int[][] rows) {
-     assert rows.length > 0;
-     StringBuilder sb = new StringBuilder("values");
-     for(int[] row : rows) {
-       assert row.length > 0;
-       if(row.length > 1) {
-         sb.append("(");
-       }
-       for(int value : row) {
-         sb.append(value).append(",");
-       }
-       sb.setLength(sb.length() - 1);//remove trailing comma
-       if(row.length > 1) {
-         sb.append(")");
-       }
-       sb.append(",");
-     }
-     sb.setLength(sb.length() - 1);//remove trailing comma
-     return sb.toString();
-   }
- 
-   private List<String> runStatementOnDriver(String stmt) throws Exception {
-     LOG.info("Running " + stmt);
-     CommandProcessorResponse cpr = d.run(stmt);
-     if(cpr.getResponseCode() != 0) {
-       throw new RuntimeException(stmt + " failed: " + cpr);
-     }
-     List<String> rs = new ArrayList<String>();
-     d.getResults(rs);
-     return rs;
-   }
-   private CommandProcessorResponse runStatementOnDriverNegative(String stmt) 
throws Exception {
-     CommandProcessorResponse cpr = d.run(stmt);
-     if(cpr.getResponseCode() != 0) {
-       return cpr;
-     }
-     throw new RuntimeException("Didn't get expected failure!");
-   }
--
- //  @Ignore
    @Test
    public void exchangePartition() throws Exception {
      runStatementOnDriver("create database ex1");

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --cc ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 7a73a17,21b4a2c..bd6e6a0
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@@ -85,16 -84,16 +84,17 @@@ public class TestTxnCommands2 
  
    protected HiveConf hiveConf;
    protected Driver d;
-   protected static enum Table {
+   protected enum Table {
      ACIDTBL("acidTbl"),
-     ACIDTBLPART("acidTblPart"),
+     ACIDTBLPART("acidTblPart", "p"),
      NONACIDORCTBL("nonAcidOrcTbl"),
-     NONACIDPART("nonAcidPart"),
-     NONACIDPART2("nonAcidPart2"),
-     ACIDNESTEDPART("acidNestedPart"),
+     NONACIDPART("nonAcidPart", "p"),
+     NONACIDPART2("nonAcidPart2", "p2"),
 -    ACIDNESTEDPART("acidNestedPart", "p,q");
++    ACIDNESTEDPART("acidNestedPart", "p,q"),
 +    MMTBL("mmTbl");
  
      private final String name;
+     private final String partitionColumns;
      @Override
      public String toString() {
        return name;
@@@ -738,14 -841,13 +843,13 @@@
      // There should be only 1 directory left: base_0000001.
      // Original bucket files, delta directories and previous base directory 
should have been cleaned up.
      status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-         (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+       (Table.NONACIDORCTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
      Assert.assertEquals(1, status.length);
 -    Assert.assertEquals("base_0000023", status[0].getPath().getName());
 +    Assert.assertEquals("base_0000025", status[0].getPath().getName());
      FileStatus[] buckets = fs.listStatus(status[0].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
      Arrays.sort(buckets);
-     Assert.assertEquals(BUCKET_COUNT, buckets.length);
-     Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
-     Assert.assertEquals("bucket_00001", buckets[1].getPath().getName());
+     Assert.assertEquals(1, buckets.length);
+     Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
      rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
      resultData = new int[][] {{1, 3}, {3, 4}};
      Assert.assertEquals(stringifyValues(resultData), rs);
@@@ -1643,65 -1755,241 +1757,299 @@@
      Assert.assertEquals(stringifyValues(rExpected), r);
    }
  
+   @Test
+   public void testBucketCodec() throws Exception {
+     d.destroy();
+     //insert data in "legacy" format
+     hiveConf.setIntVar(HiveConf.ConfVars.TESTMODE_BUCKET_CODEC_VERSION, 0);
+     d = new Driver(hiveConf);
+ 
+     int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}};
+     runStatementOnDriver("insert into " + Table.ACIDTBL + " " + 
makeValuesClause(targetVals));
+ 
+     d.destroy();
+     hiveConf.setIntVar(HiveConf.ConfVars.TESTMODE_BUCKET_CODEC_VERSION, 1);
+     d = new Driver(hiveConf);
+     //do some operations with new format
+     runStatementOnDriver("update " + Table.ACIDTBL + " set b=11 where a in 
(5,7)");
+     runStatementOnDriver("insert into " + Table.ACIDTBL + " values(11,11)");
+     runStatementOnDriver("delete from " + Table.ACIDTBL + " where a = 7");
+ 
+     //make sure we get the right data back before/after compactions
+     List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL 
+ " order by a,b");
+     int[][] rExpected = {{2,1},{4,3},{5,11},{11,11}};
+     Assert.assertEquals(stringifyValues(rExpected), r);
+ 
+     runStatementOnDriver("ALTER TABLE " + Table.ACIDTBL + " COMPACT 'MINOR'");
+     runWorker(hiveConf);
+ 
+     r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by 
a,b");
+     Assert.assertEquals(stringifyValues(rExpected), r);
+ 
+     runStatementOnDriver("ALTER TABLE " + Table.ACIDTBL + " COMPACT 'MAJOR'");
+     runWorker(hiveConf);
+ 
+     r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by 
a,b");
+     Assert.assertEquals(stringifyValues(rExpected), r);
+   }
+   /**
+    * Test the scenario when IOW comes in before a MAJOR compaction happens
+    * @throws Exception
+    */
+   @Test
+   public void testInsertOverwrite1() throws Exception {
+     FileSystem fs = FileSystem.get(hiveConf);
+     FileStatus[] status;
+ 
+     // 1. Insert two rows to an ACID table
+     runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) 
values(1,2)");
+     runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) 
values(3,4)");
+     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+         (Table.ACIDTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+     // There should be 2 delta dirs in the location
+     Assert.assertEquals(2, status.length);
+     for (int i = 0; i < status.length; i++) {
+       Assert.assertTrue(status[i].getPath().getName().matches("delta_.*"));
+     }
+ 
+     // 2. INSERT OVERWRITE
+     // Prepare data for the source table
+     runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) 
values(5,6),(7,8)");
+     // Insert overwrite ACID table from source table
+     runStatementOnDriver("insert overwrite table " + Table.ACIDTBL + " select 
a,b from " + Table.NONACIDORCTBL);
+     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+         (Table.ACIDTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+     // There should be 2 delta dirs, plus a base dir in the location
+     Assert.assertEquals(3, status.length);
+     boolean sawBase = false;
+     String baseDir = "";
+     int deltaCount = 0;
+     for (int i = 0; i < status.length; i++) {
+       String dirName = status[i].getPath().getName();
+       if (dirName.matches("delta_.*")) {
+         deltaCount++;
+       } else {
+         sawBase = true;
+         baseDir = dirName;
+         Assert.assertTrue(baseDir.matches("base_.*"));
+       }
+     }
+     Assert.assertEquals(2, deltaCount);
+     Assert.assertTrue(sawBase);
+     // Verify query result
+     List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL 
+ " order by a");
+     int [][] resultData = new int[][] {{5,6},{7,8}};
+     Assert.assertEquals(stringifyValues(resultData), rs);
+ 
+     // 3. Perform a major compaction. Nothing should change. Both deltas and 
base dirs should have the same name.
+     // Re-verify directory layout and query result by using the same logic as 
above
+     runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
+     runWorker(hiveConf);
+     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+         (Table.ACIDTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+     // There should be 2 delta dirs, plus a base dir in the location
+     Assert.assertEquals(3, status.length);
+     sawBase = false;
+     deltaCount = 0;
+     for (int i = 0; i < status.length; i++) {
+       String dirName = status[i].getPath().getName();
+       if (dirName.matches("delta_.*")) {
+         deltaCount++;
+       } else {
+         sawBase = true;
+         Assert.assertTrue(dirName.matches("base_.*"));
+         Assert.assertEquals(baseDir, dirName);
+       }
+     }
+     Assert.assertEquals(2, deltaCount);
+     Assert.assertTrue(sawBase);
+     // Verify query result
+     rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by 
a");
+     Assert.assertEquals(stringifyValues(resultData), rs);
+ 
+     // 4. Run Cleaner. It should remove the 2 delta dirs.
+     runCleaner(hiveConf);
+     // There should be only 1 directory left: base_xxxxxxx.
+     // The delta dirs should have been cleaned up.
+     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+         (Table.ACIDTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+     Assert.assertEquals(1, status.length);
+     Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
+     Assert.assertEquals(baseDir, status[0].getPath().getName());
+     // Verify query result
+     rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by 
a");
+     Assert.assertEquals(stringifyValues(resultData), rs);
+   }
+ 
+   /**
+    * Test the scenario when IOW comes in after a MAJOR compaction happens
+    * @throws Exception
+    */
+   @Test
+   public void testInsertOverwrite2() throws Exception {
+     FileSystem fs = FileSystem.get(hiveConf);
+     FileStatus[] status;
+ 
+     // 1. Insert two rows to an ACID table
+     runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) 
values(1,2)");
+     runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) 
values(3,4)");
+     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+         (Table.ACIDTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+     // There should be 2 delta dirs in the location
+     Assert.assertEquals(2, status.length);
+     for (int i = 0; i < status.length; i++) {
+       Assert.assertTrue(status[i].getPath().getName().matches("delta_.*"));
+     }
+ 
+     // 2. Perform a major compaction. There should be an extra base dir now.
+     runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
+     runWorker(hiveConf);
+     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+         (Table.ACIDTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+     // There should be 2 delta dirs, plus a base dir in the location
+     Assert.assertEquals(3, status.length);
+     boolean sawBase = false;
+     int deltaCount = 0;
+     for (int i = 0; i < status.length; i++) {
+       String dirName = status[i].getPath().getName();
+       if (dirName.matches("delta_.*")) {
+         deltaCount++;
+       } else {
+         sawBase = true;
+         Assert.assertTrue(dirName.matches("base_.*"));
+       }
+     }
+     Assert.assertEquals(2, deltaCount);
+     Assert.assertTrue(sawBase);
+     // Verify query result
+     int [][] resultData = new int[][] {{1,2},{3,4}};
+     List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL 
+ " order by a");
+     Assert.assertEquals(stringifyValues(resultData), rs);
+ 
+     // 3. INSERT OVERWRITE
+     // Prepare data for the source table
+     runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) 
values(5,6),(7,8)");
+     // Insert overwrite ACID table from source table
+     runStatementOnDriver("insert overwrite table " + Table.ACIDTBL + " select 
a,b from " + Table.NONACIDORCTBL);
+     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+         (Table.ACIDTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+     // There should be 2 delta dirs, plus 2 base dirs in the location
+     Assert.assertEquals(4, status.length);
+     int baseCount = 0;
+     deltaCount = 0;
+     for (int i = 0; i < status.length; i++) {
+       String dirName = status[i].getPath().getName();
+       if (dirName.matches("delta_.*")) {
+         deltaCount++;
+       } else {
+         baseCount++;
+       }
+     }
+     Assert.assertEquals(2, deltaCount);
+     Assert.assertEquals(2, baseCount);
+     // Verify query result
+     resultData = new int[][] {{5,6},{7,8}};
+     rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by 
a");
+     Assert.assertEquals(stringifyValues(resultData), rs);
+ 
+     // 4. Perform another major compaction. Nothing should change. Both 
deltas and  both base dirs
+     // should have the same name.
+     // Re-verify directory layout and query result by using the same logic as 
above
+     runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
+     runWorker(hiveConf);
+     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+         (Table.ACIDTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+     // There should be 2 delta dirs, plus 2 base dirs in the location
+     Assert.assertEquals(4, status.length);
+     baseCount = 0;
+     deltaCount = 0;
+     for (int i = 0; i < status.length; i++) {
+       String dirName = status[i].getPath().getName();
+       if (dirName.matches("delta_.*")) {
+         deltaCount++;
+       } else {
+         Assert.assertTrue(dirName.matches("base_.*"));
+         baseCount++;
+       }
+     }
+     Assert.assertEquals(2, deltaCount);
+     Assert.assertEquals(2, baseCount);
+     // Verify query result
+     rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by 
a");
+     Assert.assertEquals(stringifyValues(resultData), rs);
+ 
+     // 5. Run Cleaner. It should remove the 2 delta dirs and 1 old base dir.
+     runCleaner(hiveConf);
+     // There should be only 1 directory left: base_xxxxxxx.
+     // The delta dirs should have been cleaned up.
+     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+         (Table.ACIDTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
+     Assert.assertEquals(1, status.length);
+     Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
+     // Verify query result
+     rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by 
a");
+     Assert.assertEquals(stringifyValues(resultData), rs);
+   }
+ 
    /**
 +   * Test compaction for Micro-managed table
 +   * 1. Regular compaction shouldn't impact any valid subdirectories of MM 
tables
 +   * 2. Compactions will only remove subdirectories for aborted transactions 
of MM tables, if any
 +   * @throws Exception
 +   */
 +  @Test
 +  public void testMmTableCompaction() throws Exception {
 +    // 1. Insert some rows into MM table
 +    runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(1,2)");
 +    runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(3,4)");
 +    // There should be 2 delta directories
 +    verifyDirAndResult(2);
 +
 +    // 2. Perform a MINOR compaction. Since nothing was aborted, subdirs 
should stay.
 +    runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'");
 +    runWorker(hiveConf);
 +    verifyDirAndResult(2);
 +
 +    // 3. Let a transaction be aborted
 +    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
 +    runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(5,6)");
 +    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
 +    // There should be 3 delta directories. The new one is the aborted one.
 +    verifyDirAndResult(3);
 +
 +    // 4. Perform a MINOR compaction again. This time it will remove the 
subdir for aborted transaction.
 +    runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'");
 +    runWorker(hiveConf);
 +    // The worker should remove the subdir for aborted transaction
 +    verifyDirAndResult(2);
 +
 +    // 5. Run Cleaner. Shouldn't impact anything.
 +    runCleaner(hiveConf);
 +    verifyDirAndResult(2);
 +  }
 +
 +  private void verifyDirAndResult(int expectedDeltas) throws Exception {
 +    FileSystem fs = FileSystem.get(hiveConf);
 +    // Verify the content of subdirs
 +    FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
 +        (Table.MMTBL).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
 +    int sawDeltaTimes = 0;
 +    for (int i = 0; i < status.length; i++) {
 +      Assert.assertTrue(status[i].getPath().getName().matches("delta_.*"));
 +      sawDeltaTimes++;
 +      FileStatus[] files = fs.listStatus(status[i].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
 +      Assert.assertEquals(1, files.length);
 +      Assert.assertTrue(files[0].getPath().getName().equals("000000_0"));
 +    }
 +    Assert.assertEquals(expectedDeltas, sawDeltaTimes);
 +
 +    // Verify query result
 +    int [][] resultData = new int[][] {{1,2}, {3,4}};
 +    List<String> rs = runStatementOnDriver("select a,b from " + Table.MMTBL);
 +    Assert.assertEquals(stringifyValues(resultData), rs);
 +  }
 +
 +  /**
     * takes raw data and turns it into a string as if from Driver.getResults()
     * sorts rows in dictionary order
     */

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --cc 
ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index 6b19eb1,4c30732..ccd7d8e
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@@ -40,10 -46,15 +46,7 @@@ import java.util.TreeSet
  
  import org.apache.commons.codec.binary.Base64;
  import org.apache.hadoop.conf.Configuration;
 -import org.apache.hadoop.fs.BlockLocation;
 -import org.apache.hadoop.fs.FSDataInputStream;
--import org.apache.hadoop.fs.FSDataOutputStream;
 -import org.apache.hadoop.fs.FSInputStream;
--import org.apache.hadoop.fs.FileStatus;
--import org.apache.hadoop.fs.FileSystem;
 -import org.apache.hadoop.fs.LocatedFileStatus;
--import org.apache.hadoop.fs.Path;
 -import org.apache.hadoop.fs.RemoteIterator;
++import org.apache.hadoop.fs.*;
  import org.apache.hadoop.fs.permission.FsPermission;
  import org.apache.hadoop.hive.common.ValidTxnList;
  import org.apache.hadoop.hive.common.type.HiveDecimal;

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/test/queries/clientpositive/mm_all.q
----------------------------------------------------------------------
diff --cc ql/src/test/queries/clientpositive/mm_all.q
index 6001e9f,0000000..e23260f
mode 100644,000000..100644
--- a/ql/src/test/queries/clientpositive/mm_all.q
+++ b/ql/src/test/queries/clientpositive/mm_all.q
@@@ -1,370 -1,0 +1,325 @@@
 +set hive.mapred.mode=nonstrict;
 +set hive.explain.user=false;
 +set hive.fetch.task.conversion=none;
 +set tez.grouping.min-size=1;
 +set tez.grouping.max-size=2;
 +set hive.exec.dynamic.partition.mode=nonstrict;
 +set hive.support.concurrency=true;
 +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
 +
 +
 +-- Force multiple writers when reading
 +drop table intermediate;
 +create table intermediate(key int) partitioned by (p int) stored as orc;
 +insert into table intermediate partition(p='455') select distinct key from 
src where key >= 0 order by key desc limit 2;
 +insert into table intermediate partition(p='456') select distinct key from 
src where key is not null order by key asc limit 2;
 +insert into table intermediate partition(p='457') select distinct key from 
src where key >= 100 order by key asc limit 2;
 +
 +
 +drop table part_mm;
 +create table part_mm(key int) partitioned by (key_mm int) stored as orc 
tblproperties ("transactional"="true", 
"transactional_properties"="insert_only");
 +explain insert into table part_mm partition(key_mm=455) select key from 
intermediate;
 +insert into table part_mm partition(key_mm=455) select key from intermediate;
 +insert into table part_mm partition(key_mm=456) select key from intermediate;
 +insert into table part_mm partition(key_mm=455) select key from intermediate;
 +select * from part_mm order by key, key_mm;
 +
 +-- TODO: doesn't work truncate table part_mm partition(key_mm=455);
 +select * from part_mm order by key, key_mm;
 +truncate table part_mm;
 +select * from part_mm order by key, key_mm;
 +drop table part_mm;
 +
 +drop table simple_mm;
 +create table simple_mm(key int) stored as orc tblproperties 
("transactional"="true", "transactional_properties"="insert_only");
 +insert into table simple_mm select key from intermediate;
 +select * from simple_mm order by key;
 +insert into table simple_mm select key from intermediate;
 +select * from simple_mm order by key;
 +truncate table simple_mm;
 +select * from simple_mm;
 +drop table simple_mm;
 +
 +
 +-- simple DP (no bucketing)
 +drop table dp_mm;
 +
 +set hive.exec.dynamic.partition.mode=nonstrict;
 +
 +set hive.merge.mapredfiles=false;
 +set hive.merge.sparkfiles=false;
 +set hive.merge.tezfiles=false;
 +
 +create table dp_mm (key int) partitioned by (key1 string, key2 int) stored as 
orc
 +  tblproperties ("transactional"="true", 
"transactional_properties"="insert_only");
 +
 +insert into table dp_mm partition (key1='123', key2) select key, key from 
intermediate;
 +
 +select * from dp_mm order by key;
 +
 +drop table dp_mm;
 +
 +
 +-- union
 +
 +create table union_mm(id int)  tblproperties ("transactional"="true", 
"transactional_properties"="insert_only");
 +insert into table union_mm 
 +select temps.p from (
 +select key as p from intermediate 
 +union all 
 +select key + 1 as p from intermediate ) temps;
 +
 +select * from union_mm order by id;
 +
 +insert into table union_mm 
 +select p from
 +(
 +select key + 1 as p from intermediate
 +union all
 +select key from intermediate
 +) tab group by p
 +union all
 +select key + 2 as p from intermediate;
 +
 +select * from union_mm order by id;
 +
 +insert into table union_mm
 +SELECT p FROM
 +(
 +  SELECT key + 1 as p FROM intermediate
 +  UNION ALL
 +  SELECT key as p FROM ( 
 +    SELECT distinct key FROM (
 +      SELECT key FROM (
 +        SELECT key + 2 as key FROM intermediate
 +        UNION ALL
 +        SELECT key FROM intermediate
 +      )t1 
 +    group by key)t2
 +  )t3
 +)t4
 +group by p;
 +
 +
 +select * from union_mm order by id;
 +drop table union_mm;
 +
 +
 +create table partunion_mm(id int) partitioned by (key int) tblproperties 
("transactional"="true", "transactional_properties"="insert_only");
 +insert into table partunion_mm partition(key)
 +select temps.* from (
 +select key as p, key from intermediate 
 +union all 
 +select key + 1 as p, key + 1 from intermediate ) temps;
 +
 +select * from partunion_mm order by id;
 +drop table partunion_mm;
 +
 +
 +
 +create table skew_mm(k1 int, k2 int, k4 int) skewed by (k1, k4) on 
((0,0),(1,1),(2,2),(3,3))
 + stored as directories tblproperties ("transactional"="true", 
"transactional_properties"="insert_only");
 +
 +insert into table skew_mm 
 +select key, key, key from intermediate;
 +
 +select * from skew_mm order by k2, k1, k4;
 +drop table skew_mm;
 +
 +
 +create table skew_dp_union_mm(k1 int, k2 int, k4 int) partitioned by (k3 int) 
 +skewed by (k1, k4) on ((0,0),(1,1),(2,2),(3,3)) stored as directories 
tblproperties ("transactional"="true", 
"transactional_properties"="insert_only");
 +
 +insert into table skew_dp_union_mm partition (k3)
 +select key as i, key as j, key as k, key as l from intermediate
 +union all 
 +select key +1 as i, key +2 as j, key +3 as k, key +4 as l from intermediate;
 +
 +
 +select * from skew_dp_union_mm order by k2, k1, k4;
 +drop table skew_dp_union_mm;
 +
 +
 +
 +set hive.merge.orcfile.stripe.level=true;
 +set hive.merge.tezfiles=true;
 +set hive.merge.mapfiles=true;
 +set hive.merge.mapredfiles=true;
 +
 +
 +create table merge0_mm (id int) stored as orc 
tblproperties("transactional"="true", "transactional_properties"="insert_only");
 +
 +insert into table merge0_mm select key from intermediate;
 +select * from merge0_mm;
 +
 +set tez.grouping.split-count=1;
 +insert into table merge0_mm select key from intermediate;
 +set tez.grouping.split-count=0;
 +select * from merge0_mm;
 +
 +drop table merge0_mm;
 +
 +
 +create table merge2_mm (id int) tblproperties("transactional"="true", 
"transactional_properties"="insert_only");
 +
 +insert into table merge2_mm select key from intermediate;
 +select * from merge2_mm;
 +
 +set tez.grouping.split-count=1;
 +insert into table merge2_mm select key from intermediate;
 +set tez.grouping.split-count=0;
 +select * from merge2_mm;
 +
 +drop table merge2_mm;
 +
 +
 +create table merge1_mm (id int) partitioned by (key int) stored as orc 
tblproperties("transactional"="true", "transactional_properties"="insert_only");
 +
 +insert into table merge1_mm partition (key) select key, key from intermediate;
 +select * from merge1_mm order by id, key;
 +
 +set tez.grouping.split-count=1;
 +insert into table merge1_mm partition (key) select key, key from intermediate;
 +set tez.grouping.split-count=0;
 +select * from merge1_mm order by id, key;
 +
 +drop table merge1_mm;
 +
 +set hive.merge.tezfiles=false;
 +set hive.merge.mapfiles=false;
 +set hive.merge.mapredfiles=false;
 +
 +-- TODO: need to include merge+union+DP, but it's broken for now
 +
 +
 +drop table ctas0_mm;
 +create table ctas0_mm tblproperties ("transactional"="true", 
"transactional_properties"="insert_only") as select * from intermediate;
 +select * from ctas0_mm;
 +drop table ctas0_mm;
 +
 +drop table ctas1_mm;
 +create table ctas1_mm tblproperties ("transactional"="true", 
"transactional_properties"="insert_only") as
 +  select * from intermediate union all select * from intermediate;
 +select * from ctas1_mm;
 +drop table ctas1_mm;
 +
 +
- drop table load0_mm;
- create table load0_mm (key string, value string) stored as textfile 
tblproperties("transactional"="true", "transactional_properties"="insert_only");
- load data local inpath '../../data/files/kv1.txt' into table load0_mm;
- select count(1) from load0_mm;
- load data local inpath '../../data/files/kv2.txt' into table load0_mm;
- select count(1) from load0_mm;
- load data local inpath '../../data/files/kv2.txt' overwrite into table 
load0_mm;
- select count(1) from load0_mm;
- drop table load0_mm;
- 
- 
- drop table intermediate2;
- create table intermediate2 (key string, value string) stored as textfile
- location 'file:${system:test.tmp.dir}/intermediate2';
- load data local inpath '../../data/files/kv1.txt' into table intermediate2;
- load data local inpath '../../data/files/kv2.txt' into table intermediate2;
- load data local inpath '../../data/files/kv3.txt' into table intermediate2;
- 
- drop table load1_mm;
- create table load1_mm (key string, value string) stored as textfile 
tblproperties("transactional"="true", "transactional_properties"="insert_only");
- load data inpath 'file:${system:test.tmp.dir}/intermediate2/kv2.txt' into 
table load1_mm;
- load data inpath 'file:${system:test.tmp.dir}/intermediate2/kv1.txt' into 
table load1_mm;
- select count(1) from load1_mm;
- load data local inpath '../../data/files/kv1.txt' into table intermediate2;
- load data local inpath '../../data/files/kv2.txt' into table intermediate2;
- load data local inpath '../../data/files/kv3.txt' into table intermediate2;
- load data inpath 'file:${system:test.tmp.dir}/intermediate2/kv*.txt' 
overwrite into table load1_mm;
- select count(1) from load1_mm;
- load data local inpath '../../data/files/kv2.txt' into table intermediate2;
- load data inpath 'file:${system:test.tmp.dir}/intermediate2/kv2.txt' 
overwrite into table load1_mm;
- select count(1) from load1_mm;
- drop table load1_mm;
- 
- drop table load2_mm;
- create table load2_mm (key string, value string)
-   partitioned by (k int, l int) stored as textfile 
tblproperties("transactional"="true", "transactional_properties"="insert_only");
- load data local inpath '../../data/files/kv1.txt' into table intermediate2;
- load data local inpath '../../data/files/kv2.txt' into table intermediate2;
- load data local inpath '../../data/files/kv3.txt' into table intermediate2;
- load data inpath 'file:${system:test.tmp.dir}/intermediate2/kv*.txt' into 
table load2_mm partition(k=5, l=5);
- select count(1) from load2_mm;
- drop table load2_mm;
- drop table intermediate2;
- 
- 
 +drop table multi0_1_mm;
 +drop table multi0_2_mm;
 +create table multi0_1_mm (key int, key2 int)  
tblproperties("transactional"="true", "transactional_properties"="insert_only");
 +create table multi0_2_mm (key int, key2 int)  
tblproperties("transactional"="true", "transactional_properties"="insert_only");
 +
 +--from intermediate
 +--insert overwrite table multi0_1_mm select key, p
 +--insert overwrite table multi0_2_mm select p, key;
 +insert into table multi0_1_mm select key, p from intermediate;
 +insert into table multi0_2_mm select p, key from intermediate;
 +
 +select * from multi0_1_mm order by key, key2;
 +select * from multi0_2_mm order by key, key2;
 +
 +set hive.merge.mapredfiles=true;
 +set hive.merge.sparkfiles=true;
 +set hive.merge.tezfiles=true;
 +
 +--from intermediate
 +--insert into table multi0_1_mm select p, key
 +--insert overwrite table multi0_2_mm select key, p;
 +insert into table multi0_1_mm select p, key from intermediate;
 +insert into table multi0_2_mm select key, p from intermediate;
 +select * from multi0_1_mm order by key, key2;
 +select * from multi0_2_mm order by key, key2;
 +
 +set hive.merge.mapredfiles=false;
 +set hive.merge.sparkfiles=false;
 +set hive.merge.tezfiles=false;
 +
 +drop table multi0_1_mm;
 +drop table multi0_2_mm;
 +
 +
 +drop table multi1_mm;
 +create table multi1_mm (key int, key2 int) partitioned by (p int) 
tblproperties("transactional"="true", "transactional_properties"="insert_only");
 +from intermediate
 +insert into table multi1_mm partition(p=1) select p, key
 +insert into table multi1_mm partition(p=2) select key, p;
 +select * from multi1_mm order by key, key2, p;
 +--from intermediate
 +--insert into table multi1_mm partition(p=2) select p, key
 +--insert overwrite table multi1_mm partition(p=1) select key, p;
 +insert into table multi1_mm partition(p=2) select p, key from intermediate;
 +insert into table multi1_mm partition(p=1) select key, p from intermediate;
 +select * from multi1_mm order by key, key2, p;
 +
 +from intermediate
 +insert into table multi1_mm partition(p) select p, key, p
 +insert into table multi1_mm partition(p=1) select key, p;
 +select key, key2, p from multi1_mm order by key, key2, p;
 +
 +from intermediate
 +insert into table multi1_mm partition(p) select p, key, 1
 +insert into table multi1_mm partition(p=1) select key, p;
 +select key, key2, p from multi1_mm order by key, key2, p;
 +drop table multi1_mm;
 +
 +
 +
 +
 +set datanucleus.cache.collections=false;
 +set hive.stats.autogather=true;
 +
 +drop table stats_mm;
 +create table stats_mm(key int)  tblproperties("transactional"="true", 
"transactional_properties"="insert_only");
 +--insert overwrite table stats_mm  select key from intermediate;
 +insert into table stats_mm  select key from intermediate;
 +desc formatted stats_mm;
 +
 +insert into table stats_mm  select key from intermediate;
 +desc formatted stats_mm;
 +drop table stats_mm;
 +
 +drop table stats2_mm;
 +create table stats2_mm tblproperties("transactional"="true", 
"transactional_properties"="insert_only") as select array(key, value) from src;
 +desc formatted stats2_mm;
 +drop table stats2_mm;
 +
 +
 +set hive.optimize.skewjoin=true;
 +set hive.skewjoin.key=2;
 +set hive.optimize.metadataonly=false;
 +
 +CREATE TABLE skewjoin_mm(key INT, value STRING) STORED AS TEXTFILE 
tblproperties ("transactional"="true", 
"transactional_properties"="insert_only");
 +FROM src src1 JOIN src src2 ON (src1.key = src2.key) INSERT into TABLE 
skewjoin_mm SELECT src1.key, src2.value;
 +select count(distinct key) from skewjoin_mm;
 +drop table skewjoin_mm;
 +
 +set hive.optimize.skewjoin=false;
 +
 +set hive.optimize.index.filter=true;
 +set hive.auto.convert.join=false;
 +CREATE TABLE parquet1_mm(id INT) STORED AS PARQUET tblproperties 
("transactional"="true", "transactional_properties"="insert_only");
 +INSERT INTO parquet1_mm VALUES(1), (2);
 +CREATE TABLE parquet2_mm(id INT, value STRING) STORED AS PARQUET 
tblproperties ("transactional"="true", 
"transactional_properties"="insert_only");
 +INSERT INTO parquet2_mm VALUES(1, 'value1');
 +INSERT INTO parquet2_mm VALUES(1, 'value2');
 +select parquet1_mm.id, t1.value, t2.value FROM parquet1_mm
 +  JOIN parquet2_mm t1 ON parquet1_mm.id=t1.id
 +  JOIN parquet2_mm t2 ON parquet1_mm.id=t2.id
 +where t1.value = 'value1' and t2.value = 'value2';
 +drop table parquet1_mm;
 +drop table parquet2_mm;
 +
 +set hive.auto.convert.join=true;
 +
 +
 +DROP TABLE IF EXISTS temp1;
 +CREATE TEMPORARY TABLE temp1 (a int) TBLPROPERTIES ("transactional"="true", 
"transactional_properties"="insert_only");
 +INSERT INTO temp1 SELECT key FROM intermediate;
 +DESC EXTENDED temp1;
 +SELECT * FROM temp1;
 +
 +
 +drop table intermediate;
 +
 +
 +

Reply via email to