http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index d4b29d5..d67b1e9 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -27,8 +27,8 @@ import java.util.TreeMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -72,7 +72,7 @@ public class VectorizedOrcAcidRowBatchReader
   protected float progress = 0.0f;
   protected Object[] partitionValues;
   private boolean addPartitionCols = true;
-  private final ValidTxnList validTxnList;
+  private final ValidWriteIdList validWriteIdList;
   private final DeleteEventRegistry deleteEventRegistry;
   /**
    * {@link RecordIdentifier}/{@link VirtualColumn#ROWID} information
@@ -183,8 +183,10 @@ public class VectorizedOrcAcidRowBatchReader
       partitionValues = null;
     }
 
-    String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
-    this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new 
ValidReadTxnList(txnString);
+    String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY);
+    this.validWriteIdList = (txnString == null) ? new ValidReaderWriteIdList() 
: new ValidReaderWriteIdList(txnString);
+    LOG.debug("VectorizedOrcAcidRowBatchReader:: Read ValidWriteIdList: " + 
this.validWriteIdList.toString()
+            + " isFullAcidTable: " + AcidUtils.isFullAcidScan(conf));
 
     // Clone readerOptions for deleteEvents.
     Reader.Options deleteEventReaderOptions = readerOptions.clone();
@@ -214,7 +216,7 @@ public class VectorizedOrcAcidRowBatchReader
     }
     rowIdProjected = areRowIdsProjected(rbCtx);
     rootPath = orcSplit.getRootDir();
-    syntheticProps = computeOffsetAndBucket(orcSplit, conf, validTxnList);
+    syntheticProps = computeOffsetAndBucket(orcSplit, conf, validWriteIdList);
   }
 
   /**
@@ -242,8 +244,8 @@ public class VectorizedOrcAcidRowBatchReader
    * before/during split computation and passing the info in the split.  
(HIVE-17917)
    */
   private OffsetAndBucketProperty computeOffsetAndBucket(
-    OrcSplit split, JobConf conf,ValidTxnList validTxnList) throws IOException 
{
-    if(!needSyntheticRowIds(split.isOriginal(), 
!deleteEventRegistry.isEmpty(), rowIdProjected)) {
+      OrcSplit split, JobConf conf, ValidWriteIdList validWriteIdList) throws 
IOException {
+    if (!needSyntheticRowIds(split.isOriginal(), 
!deleteEventRegistry.isEmpty(), rowIdProjected)) {
       if(split.isOriginal()) {
         /**
          * Even if we don't need to project ROW_IDs, we still need to check 
the transaction ID that
@@ -252,22 +254,20 @@ public class VectorizedOrcAcidRowBatchReader
          * filter out base/delta files but this makes fewer dependencies)
          */
         OrcRawRecordMerger.TransactionMetaData syntheticTxnInfo =
-          
OrcRawRecordMerger.TransactionMetaData.findTransactionIDForSynthetcRowIDs(split.getPath(),
-            split.getRootDir(), conf);
-        return new OffsetAndBucketProperty(-1,-1,
-          syntheticTxnInfo.syntheticTransactionId);
+            
OrcRawRecordMerger.TransactionMetaData.findWriteIDForSynthetcRowIDs(split.getPath(),
+                    split.getRootDir(), conf);
+        return new OffsetAndBucketProperty(-1,-1, 
syntheticTxnInfo.syntheticWriteId);
       }
       return null;
     }
     long rowIdOffset = 0;
     OrcRawRecordMerger.TransactionMetaData syntheticTxnInfo =
-      
OrcRawRecordMerger.TransactionMetaData.findTransactionIDForSynthetcRowIDs(split.getPath(),
-        split.getRootDir(), conf);
+        
OrcRawRecordMerger.TransactionMetaData.findWriteIDForSynthetcRowIDs(split.getPath(),
 split.getRootDir(), conf);
     int bucketId = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), 
conf).getBucketId();
     int bucketProperty = BucketCodec.V1.encode(new 
AcidOutputFormat.Options(conf)
       .statementId(syntheticTxnInfo.statementId).bucket(bucketId));
     AcidUtils.Directory directoryState = AcidUtils.getAcidState( 
syntheticTxnInfo.folder, conf,
-      validTxnList, false, true);
+        validWriteIdList, false, true);
     for (HadoopShims.HdfsFileStatusWithId f : 
directoryState.getOriginalFiles()) {
       AcidOutputFormat.Options bucketOptions =
         AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), 
conf);
@@ -283,7 +283,7 @@ public class VectorizedOrcAcidRowBatchReader
       rowIdOffset += reader.getNumberOfRows();
     }
     return new OffsetAndBucketProperty(rowIdOffset, bucketProperty,
-      syntheticTxnInfo.syntheticTransactionId);
+      syntheticTxnInfo.syntheticWriteId);
   }
   /**
    * {@link VectorizedOrcAcidRowBatchReader} is always used for vectorized 
reads of acid tables.
@@ -426,7 +426,7 @@ public class VectorizedOrcAcidRowBatchReader
             " to handle original files that require ROW__IDs: " + rootPath);
         }
         /**
-         * {@link RecordIdentifier#getTransactionId()}
+         * {@link RecordIdentifier#getWriteId()}
          */
         recordIdColumnVector.fields[0].noNulls = true;
         recordIdColumnVector.fields[0].isRepeating = true;
@@ -450,11 +450,11 @@ public class VectorizedOrcAcidRowBatchReader
         }
         //Now populate a structure to use to apply delete events
         innerRecordIdColumnVector = new ColumnVector[OrcRecordUpdater.FIELDS];
-        innerRecordIdColumnVector[OrcRecordUpdater.ORIGINAL_TRANSACTION] = 
recordIdColumnVector.fields[0];
+        innerRecordIdColumnVector[OrcRecordUpdater.ORIGINAL_WRITEID] = 
recordIdColumnVector.fields[0];
         innerRecordIdColumnVector[OrcRecordUpdater.BUCKET] = 
recordIdColumnVector.fields[1];
         innerRecordIdColumnVector[OrcRecordUpdater.ROW_ID] = 
recordIdColumnVector.fields[2];
         //these are insert events so (original txn == current) txn for all rows
-        innerRecordIdColumnVector[OrcRecordUpdater.CURRENT_TRANSACTION] = 
recordIdColumnVector.fields[0];
+        innerRecordIdColumnVector[OrcRecordUpdater.CURRENT_WRITEID] = 
recordIdColumnVector.fields[0];
       }
       if(syntheticProps.syntheticTxnId > 0) {
         //"originals" (written before table was converted to acid) is 
considered written by
@@ -470,7 +470,7 @@ public class VectorizedOrcAcidRowBatchReader
           * reader (transactions) is concerned.  Since here we are reading 
'original' schema file,
           * all rows in it have been created by the same txn, namely 
'syntheticProps.syntheticTxnId'
           */
-          if (!validTxnList.isTxnValid(syntheticProps.syntheticTxnId)) {
+          if (!validWriteIdList.isWriteIdValid(syntheticProps.syntheticTxnId)) 
{
             selectedBitSet.clear(0, vectorizedRowBatchBase.size);
           }
         }
@@ -514,7 +514,7 @@ public class VectorizedOrcAcidRowBatchReader
       // Transfer columnVector objects from base batch to outgoing batch.
       System.arraycopy(payloadStruct.fields, 0, value.cols, 0, 
value.getDataColumnCount());
       if(rowIdProjected) {
-        recordIdColumnVector.fields[0] = 
vectorizedRowBatchBase.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION];
+        recordIdColumnVector.fields[0] = 
vectorizedRowBatchBase.cols[OrcRecordUpdater.ORIGINAL_WRITEID];
         recordIdColumnVector.fields[1] = 
vectorizedRowBatchBase.cols[OrcRecordUpdater.BUCKET];
         recordIdColumnVector.fields[2] = 
vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW_ID];
       }
@@ -531,24 +531,24 @@ public class VectorizedOrcAcidRowBatchReader
   }
 
   private void findRecordsWithInvalidTransactionIds(ColumnVector[] cols, int 
size, BitSet selectedBitSet) {
-    if (cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating) {
+    if (cols[OrcRecordUpdater.CURRENT_WRITEID].isRepeating) {
       // When we have repeating values, we can unset the whole bitset at once
       // if the repeating value is not a valid transaction.
       long currentTransactionIdForBatch = ((LongColumnVector)
-          cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[0];
-      if (!validTxnList.isTxnValid(currentTransactionIdForBatch)) {
+          cols[OrcRecordUpdater.CURRENT_WRITEID]).vector[0];
+      if (!validWriteIdList.isWriteIdValid(currentTransactionIdForBatch)) {
         selectedBitSet.clear(0, size);
       }
       return;
     }
     long[] currentTransactionVector =
-        ((LongColumnVector) cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector;
+        ((LongColumnVector) cols[OrcRecordUpdater.CURRENT_WRITEID]).vector;
     // Loop through the bits that are set to true and mark those rows as 
false, if their
     // current transactions are not valid.
     for (int setBitIndex = selectedBitSet.nextSetBit(0);
         setBitIndex >= 0;
         setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1)) {
-      if (!validTxnList.isTxnValid(currentTransactionVector[setBitIndex])) {
+      if 
(!validWriteIdList.isWriteIdValid(currentTransactionVector[setBitIndex])) {
         selectedBitSet.clear(setBitIndex);
       }
    }
@@ -630,30 +630,33 @@ public class VectorizedOrcAcidRowBatchReader
     private OrcRawRecordMerger.ReaderKey deleteRecordKey;
     private OrcStruct deleteRecordValue;
     private Boolean isDeleteRecordAvailable = null;
-    private ValidTxnList validTxnList;
+    private ValidWriteIdList validWriteIdList;
 
     SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, 
Reader.Options readerOptions)
-      throws IOException {
-        final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit);
-        if (deleteDeltas.length > 0) {
-          int bucket = 
AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), 
conf).getBucketId();
-          String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
-          this.validTxnList = (txnString == null) ? new ValidReadTxnList() : 
new ValidReadTxnList(txnString);
-          OrcRawRecordMerger.Options mergerOptions = new 
OrcRawRecordMerger.Options().isDeleteReader(true);
-          assert !orcSplit.isOriginal() : "If this now supports Original 
splits, set up mergeOptions properly";
-          this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, 
bucket,
-                                                      validTxnList, 
readerOptions, deleteDeltas,
-                                                      mergerOptions);
-          this.deleteRecordKey = new OrcRawRecordMerger.ReaderKey();
-          this.deleteRecordValue = this.deleteRecords.createValue();
-          // Initialize the first value in the delete reader.
-          this.isDeleteRecordAvailable = 
this.deleteRecords.next(deleteRecordKey, deleteRecordValue);
-        } else {
-          this.isDeleteRecordAvailable = false;
-          this.deleteRecordKey = null;
-          this.deleteRecordValue = null;
-          this.deleteRecords = null;
-        }
+            throws IOException {
+      final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit);
+      if (deleteDeltas.length > 0) {
+        int bucket = 
AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), 
conf).getBucketId();
+        String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY);
+        this.validWriteIdList
+                = (txnString == null) ? new ValidReaderWriteIdList() : new 
ValidReaderWriteIdList(txnString);
+        LOG.debug("SortMergedDeleteEventRegistry:: Read ValidWriteIdList: " + 
this.validWriteIdList.toString()
+                + " isFullAcidTable: " + AcidUtils.isFullAcidScan(conf));
+        OrcRawRecordMerger.Options mergerOptions = new 
OrcRawRecordMerger.Options().isDeleteReader(true);
+        assert !orcSplit.isOriginal() : "If this now supports Original splits, 
set up mergeOptions properly";
+        this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, 
bucket,
+                                                    validWriteIdList, 
readerOptions, deleteDeltas,
+                                                    mergerOptions);
+        this.deleteRecordKey = new OrcRawRecordMerger.ReaderKey();
+        this.deleteRecordValue = this.deleteRecords.createValue();
+        // Initialize the first value in the delete reader.
+        this.isDeleteRecordAvailable = 
this.deleteRecords.next(deleteRecordKey, deleteRecordValue);
+      } else {
+        this.isDeleteRecordAvailable = false;
+        this.deleteRecordKey = null;
+        this.deleteRecordValue = null;
+        this.deleteRecords = null;
+      }
     }
 
     @Override
@@ -671,8 +674,8 @@ public class VectorizedOrcAcidRowBatchReader
       }
 
       long[] originalTransaction =
-          cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null
-              : ((LongColumnVector) 
cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector;
+          cols[OrcRecordUpdater.ORIGINAL_WRITEID].isRepeating ? null
+              : ((LongColumnVector) 
cols[OrcRecordUpdater.ORIGINAL_WRITEID]).vector;
       long[] bucket =
           cols[OrcRecordUpdater.BUCKET].isRepeating ? null
               : ((LongColumnVector) cols[OrcRecordUpdater.BUCKET]).vector;
@@ -682,7 +685,7 @@ public class VectorizedOrcAcidRowBatchReader
 
       // The following repeatedX values will be set, if any of the columns are 
repeating.
       long repeatedOriginalTransaction = (originalTransaction != null) ? -1
-          : ((LongColumnVector) 
cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0];
+          : ((LongColumnVector) 
cols[OrcRecordUpdater.ORIGINAL_WRITEID]).vector[0];
       long repeatedBucket = (bucket != null) ? -1
           : ((LongColumnVector) cols[OrcRecordUpdater.BUCKET]).vector[0];
       long repeatedRowId = (rowId != null) ? -1
@@ -828,12 +831,12 @@ public class VectorizedOrcAcidRowBatchReader
       private final RecordReader recordReader;
       private int indexPtrInBatch;
       private final int bucketForSplit; // The bucket value should be same for 
all the records.
-      private final ValidTxnList validTxnList;
+      private final ValidWriteIdList validWriteIdList;
       private boolean isBucketPropertyRepeating;
       private final boolean isBucketedTable;
 
       DeleteReaderValue(Reader deleteDeltaReader, Reader.Options 
readerOptions, int bucket,
-          ValidTxnList validTxnList, boolean isBucketedTable) throws 
IOException {
+          ValidWriteIdList validWriteIdList, boolean isBucketedTable) throws 
IOException {
         this.recordReader  = deleteDeltaReader.rowsOptions(readerOptions);
         this.bucketForSplit = bucket;
         this.batch = deleteDeltaReader.getSchema().createRowBatch();
@@ -841,7 +844,7 @@ public class VectorizedOrcAcidRowBatchReader
           this.batch = null; // Oh! the first batch itself was null. Close the 
reader.
         }
         this.indexPtrInBatch = 0;
-        this.validTxnList = validTxnList;
+        this.validWriteIdList = validWriteIdList;
         this.isBucketedTable = isBucketedTable;
         checkBucketId();//check 1st batch
       }
@@ -866,7 +869,7 @@ public class VectorizedOrcAcidRowBatchReader
             checkBucketId(deleteRecordKey.bucketProperty);
           }
           ++indexPtrInBatch;
-          if (validTxnList.isTxnValid(currentTransaction)) {
+          if (validWriteIdList.isWriteIdValid(currentTransaction)) {
             isValidNext = true;
           }
         }
@@ -878,17 +881,17 @@ public class VectorizedOrcAcidRowBatchReader
       }
       private long setCurrentDeleteKey(DeleteRecordKey deleteRecordKey) {
         int originalTransactionIndex =
-          batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? 0 : 
indexPtrInBatch;
-        long originalTransaction =
-          ((LongColumnVector) 
batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[originalTransactionIndex];
+          batch.cols[OrcRecordUpdater.ORIGINAL_WRITEID].isRepeating ? 0 : 
indexPtrInBatch;
+        long originalTransaction
+                = ((LongColumnVector) 
batch.cols[OrcRecordUpdater.ORIGINAL_WRITEID]).vector[originalTransactionIndex];
         int bucketPropertyIndex =
           batch.cols[OrcRecordUpdater.BUCKET].isRepeating ? 0 : 
indexPtrInBatch;
         int bucketProperty = 
(int)((LongColumnVector)batch.cols[OrcRecordUpdater.BUCKET]).vector[bucketPropertyIndex];
         long rowId = ((LongColumnVector) 
batch.cols[OrcRecordUpdater.ROW_ID]).vector[indexPtrInBatch];
-        int currentTransactionIndex =
-          batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating ? 0 : 
indexPtrInBatch;
-        long currentTransaction =
-          ((LongColumnVector) 
batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[currentTransactionIndex];
+        int currentTransactionIndex
+                = batch.cols[OrcRecordUpdater.CURRENT_WRITEID].isRepeating ? 0 
: indexPtrInBatch;
+        long currentTransaction
+                = ((LongColumnVector) 
batch.cols[OrcRecordUpdater.CURRENT_WRITEID]).vector[currentTransactionIndex];
         deleteRecordKey.set(originalTransaction, bucketProperty, rowId);
         return currentTransaction;
       }
@@ -976,14 +979,17 @@ public class VectorizedOrcAcidRowBatchReader
     private TreeMap<DeleteRecordKey, DeleteReaderValue> sortMerger;
     private long rowIds[];
     private CompressedOtid compressedOtids[];
-    private ValidTxnList validTxnList;
+    private ValidWriteIdList validWriteIdList;
     private Boolean isEmpty = null;
 
     ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit,
         Reader.Options readerOptions) throws IOException, 
DeleteEventsOverflowMemoryException {
       int bucket = 
AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), 
conf).getBucketId();
-      String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
-      this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new 
ValidReadTxnList(txnString);
+      String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY);
+      this.validWriteIdList
+              = (txnString == null) ? new ValidReaderWriteIdList() : new 
ValidReaderWriteIdList(txnString);
+      LOG.debug("ColumnizedDeleteEventRegistry:: Read ValidWriteIdList: " + 
this.validWriteIdList.toString()
+              + " isFullAcidTable: " + AcidUtils.isFullAcidScan(conf));
       this.sortMerger = new TreeMap<DeleteRecordKey, DeleteReaderValue>();
       this.rowIds = null;
       this.compressedOtids = null;
@@ -1025,7 +1031,7 @@ public class VectorizedOrcAcidRowBatchReader
                 throw new DeleteEventsOverflowMemoryException();
               }
               DeleteReaderValue deleteReaderValue = new 
DeleteReaderValue(deleteDeltaReader,
-                  readerOptions, bucket, validTxnList, isBucketedTable);
+                  readerOptions, bucket, validWriteIdList, isBucketedTable);
               DeleteRecordKey deleteRecordKey = new DeleteRecordKey();
               if (deleteReaderValue.next(deleteRecordKey)) {
                 sortMerger.put(deleteRecordKey, deleteReaderValue);
@@ -1165,10 +1171,10 @@ public class VectorizedOrcAcidRowBatchReader
       // check if it is deleted or not.
 
       long[] originalTransactionVector =
-          cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null
-              : ((LongColumnVector) 
cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector;
+          cols[OrcRecordUpdater.ORIGINAL_WRITEID].isRepeating ? null
+              : ((LongColumnVector) 
cols[OrcRecordUpdater.ORIGINAL_WRITEID]).vector;
       long repeatedOriginalTransaction = (originalTransactionVector != null) ? 
-1
-          : ((LongColumnVector) 
cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0];
+          : ((LongColumnVector) 
cols[OrcRecordUpdater.ORIGINAL_WRITEID]).vector[0];
 
       long[] bucketProperties =
         cols[OrcRecordUpdater.BUCKET].isRepeating ? null

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 5bbfe95..683aa95 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.LockComponentBuilder;
 import org.apache.hadoop.hive.metastore.LockRequestBuilder;
@@ -50,7 +51,9 @@ import org.apache.thrift.TException;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -84,18 +87,26 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
    * transaction id.  Thus is 1 is first transaction id.
    */
   private volatile long txnId = 0;
+
+  /**
+   * The local cache of table write IDs allocated/created by the current 
transaction
+   */
+  private Map<String, Long> tableWriteIds = new HashMap<>();
+
   /**
    * assigns a unique monotonically increasing ID to each statement
    * which is part of an open transaction.  This is used by storage
    * layer (see {@link 
org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)})
    * to keep apart multiple writes of the same data within the same transaction
-   * Also see {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options}
+   * Also see {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options}.
    */
-  private int writeId = -1;
+  private int stmtId = -1;
+
   /**
-   * counts number of statements in the current transaction
+   * counts number of statements in the current transaction.
    */
   private int numStatements = 0;
+
   /**
    * if {@code true} it means current transaction is started via START 
TRANSACTION which means it cannot
    * include any Operations which cannot be rolled back (drop partition; write 
to  non-acid table).
@@ -125,9 +136,10 @@ public final class DbTxnManager extends HiveTxnManagerImpl 
{
    *
    * As a side note: what should the lock manager do with locks for 
non-transactional resources?
    * Should it it release them at the end of the stmt or txn?
-   * Some interesting thoughts: 
http://mysqlmusings.blogspot.com/2009/02/mixing-engines-in-transactions.html
+   * Some interesting thoughts: 
http://mysqlmusings.blogspot.com/2009/02/mixing-engines-in-transactions.html.
    */
   private boolean isExplicitTransaction = false;
+
   /**
    * To ensure transactions don't nest.
    */
@@ -141,6 +153,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   private ScheduledFuture<?> heartbeatTask = null;
   private Runnable shutdownRunner = null;
   private static final int SHUTDOWN_HOOK_PRIORITY = 0;
+
   /**
    * We do this on every call to make sure TM uses same MS connection as is 
used by the caller (Driver,
    * SemanticAnalyzer, etc).  {@code Hive} instances are cached using 
ThreadLocal and
@@ -208,8 +221,9 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     }
     try {
       txnId = getMS().openTxn(user);
-      writeId = 0;
+      stmtId = 0;
       numStatements = 0;
+      tableWriteIds.clear();
       isExplicitTransaction = false;
       startTransactionCount = 0;
       LOG.debug("Opened " + JavaUtils.txnIdToString(txnId));
@@ -241,7 +255,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     catch(LockException e) {
       if(e.getCause() instanceof TxnAbortedException) {
         txnId = 0;
-        writeId = -1;
+        stmtId = -1;
+        tableWriteIds.clear();
       }
       throw e;
     }
@@ -597,8 +612,9 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
           e);
     } finally {
       txnId = 0;
-      writeId = -1;
+      stmtId = -1;
       numStatements = 0;
+      tableWriteIds.clear();
     }
   }
 
@@ -622,8 +638,9 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
           e);
     } finally {
       txnId = 0;
-      writeId = -1;
+      stmtId = -1;
       numStatements = 0;
+      tableWriteIds.clear();
     }
   }
 
@@ -743,12 +760,24 @@ public final class DbTxnManager extends 
HiveTxnManagerImpl {
 
   @Override
   public ValidTxnList getValidTxns() throws LockException {
+    assert isTxnOpen();
     init();
     try {
       return getMS().getValidTxns(txnId);
     } catch (TException e) {
-      throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
-          e);
+      throw new 
LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
+    }
+  }
+
+  @Override
+  public ValidTxnWriteIdList getValidWriteIds(List<String> tableList,
+                                              String validTxnList) throws 
LockException {
+    assert isTxnOpen();
+    assert validTxnList != null && !validTxnList.isEmpty();
+    try {
+      return getMS().getValidWriteIds(txnId, tableList, validTxnList);
+    } catch (TException e) {
+      throw new 
LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
     }
   }
 
@@ -886,9 +915,25 @@ public final class DbTxnManager extends HiveTxnManagerImpl 
{
     return txnId;
   }
   @Override
-  public int getWriteIdAndIncrement() {
+  public int getStmtIdAndIncrement() {
     assert isTxnOpen();
-    return writeId++;
+    return stmtId++;
+  }
+
+  @Override
+  public long getTableWriteId(String dbName, String tableName) throws 
LockException {
+    assert isTxnOpen();
+    String fullTableName = AcidUtils.getFullTableName(dbName, tableName);
+    if (tableWriteIds.containsKey(fullTableName)) {
+      return tableWriteIds.get(fullTableName);
+    }
+    try {
+      long writeId = getMS().allocateTableWriteId(txnId, dbName, tableName);
+      tableWriteIds.put(fullTableName, writeId);
+      return writeId;
+    } catch (TException e) {
+      throw new 
LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
+    }
   }
 
   private static long getHeartbeatInterval(Configuration conf) throws 
LockException {

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index fca6408..7413074 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.lockmgr;
 
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.ValidTxnList;
@@ -62,12 +63,15 @@ class DummyTxnManager extends HiveTxnManagerImpl {
   public long getCurrentTxnId() {
     return 0L;
   }
-
   @Override
-  public int getWriteIdAndIncrement() {
+  public int getStmtIdAndIncrement() {
     return 0;
   }
   @Override
+  public long getTableWriteId(String dbName, String tableName) throws 
LockException {
+    return 0L;
+  }
+  @Override
   public HiveLockManager getLockManager() throws LockException {
     if (lockMgr == null) {
       boolean supportConcurrency =
@@ -220,6 +224,12 @@ class DummyTxnManager extends HiveTxnManagerImpl {
   }
 
   @Override
+  public ValidTxnWriteIdList getValidWriteIds(List<String> tableList,
+                                              String validTxnList) throws 
LockException {
+    return new ValidTxnWriteIdList(getCurrentTxnId());
+  }
+
+  @Override
   public String getTxnManagerName() {
     return DummyTxnManager.class.getName();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 4f9f0c2..0db2a2c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.lockmgr;
 
 import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
 import org.apache.hadoop.hive.ql.QueryPlan;
@@ -122,8 +123,7 @@ public interface HiveTxnManager {
 
   /**
    * Get the transactions that are currently valid.  The resulting
-   * {@link ValidTxnList} object is a thrift object and can
-   * be  passed to  the processing
+   * {@link ValidTxnList} object can be passed as string to the processing
    * tasks for use in the reading the data.  This call should be made once up
    * front by the planner and should never be called on the backend,
    * as this will violate the isolation level semantics.
@@ -133,6 +133,18 @@ public interface HiveTxnManager {
   ValidTxnList getValidTxns() throws LockException;
 
   /**
+   * Get the table write Ids that are valid for the current transaction.  The 
resulting
+   * {@link ValidTxnWriteIdList} object can be passed as string to the 
processing
+   * tasks for use in the reading the data.  This call will return same 
results as long as validTxnString
+   * passed is same.
+   * @param tableList list of tables (<db_name>.<table_name>) read/written by 
current transaction.
+   * @param validTxnList snapshot of valid txns for the current txn
+   * @return list of valid table write Ids.
+   * @throws LockException
+   */
+  ValidTxnWriteIdList getValidWriteIds(List<String> tableList, String 
validTxnList) throws LockException;
+
+  /**
    * Get the name for currently installed transaction manager.
    * @return transaction manager name
    */
@@ -202,7 +214,7 @@ public interface HiveTxnManager {
   boolean useNewShowLocksFormat();
 
   /**
-   * Indicate whether this transaction manager supports ACID operations
+   * Indicate whether this transaction manager supports ACID operations.
    * @return true if this transaction manager does ACID
    */
   boolean supportsAcid();
@@ -217,14 +229,19 @@ public interface HiveTxnManager {
   
   boolean isTxnOpen();
   /**
-   * if {@code isTxnOpen()}, returns the currently active transaction ID
+   * if {@code isTxnOpen()}, returns the currently active transaction ID.
    */
   long getCurrentTxnId();
 
   /**
+   * if {@code isTxnOpen()}, returns the table write ID associated with 
current active transaction.
+   */
+  long getTableWriteId(String dbName, String tableName) throws LockException;
+
+  /**
    * Should be though of more as a unique write operation ID in a given txn 
(at QueryPlan level).
    * Each statement writing data within a multi statement txn should have a 
unique WriteId.
    * Even a single statement, (e.g. Merge, multi-insert may generates several 
writes).
    */
-  int getWriteIdAndIncrement();
+  int getStmtIdAndIncrement();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 9c3b54f..8b0af3e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1497,17 +1497,19 @@ public class Hive {
    * @param isSrcLocal
    * @param isAcid
    * @param hasFollowingStatsTask
+   * @param writeId
+   * @param stmtId
    * @return
    * @throws HiveException
    */
   public void loadPartition(Path loadPath, String tableName,
       Map<String, String> partSpec, LoadFileType loadFileType, boolean 
inheritTableSpecs,
       boolean isSkewedStoreAsSubdir,  boolean isSrcLocal, boolean isAcid,
-      boolean hasFollowingStatsTask, Long txnId, int stmtId)
+      boolean hasFollowingStatsTask, Long writeId, int stmtId)
           throws HiveException {
     Table tbl = getTable(tableName);
     loadPartition(loadPath, tbl, partSpec, loadFileType, inheritTableSpecs,
-        isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask, 
txnId, stmtId);
+        isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask, 
writeId, stmtId);
   }
 
   /**
@@ -1533,11 +1535,13 @@ public class Hive {
    *          true if this is an ACID operation Insert/Update/Delete operation
    * @param hasFollowingStatsTask
    *          true if there is a following task which updates the stats, so, 
this method need not update.
+   * @param writeId write ID allocated for the current load operation
+   * @param stmtId statement ID of the current load statement
    * @return Partition object being loaded with data
    */
   public Partition loadPartition(Path loadPath, Table tbl, Map<String, String> 
partSpec,
       LoadFileType loadFileType, boolean inheritTableSpecs, boolean 
isSkewedStoreAsSubdir,
-      boolean isSrcLocal, boolean isAcidIUDoperation, boolean 
hasFollowingStatsTask, Long txnId, int stmtId)
+      boolean isSrcLocal, boolean isAcidIUDoperation, boolean 
hasFollowingStatsTask, Long writeId, int stmtId)
           throws HiveException {
     Path tblDataLocationPath =  tbl.getDataLocation();
     boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters());
@@ -1596,7 +1600,7 @@ public class Hive {
         }
         assert !isAcidIUDoperation;
         if (areEventsForDmlNeeded(tbl, oldPart)) {
-          newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId);
+          newFiles = listFilesCreatedByQuery(loadPath, writeId, stmtId);
         }
         if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
           Utilities.FILE_OP_LOGGER.trace("maybe deleting stuff from " + 
oldPartPath
@@ -1608,12 +1612,12 @@ public class Hive {
         Path destPath = newPartPath;
         if (isMmTableWrite) {
           // We will load into MM directory, and delete from the parent if 
needed.
-          destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, 
stmtId));
+          destPath = new Path(destPath, AcidUtils.deltaSubdir(writeId, 
writeId, stmtId));
           filter = (loadFileType == LoadFileType.REPLACE_ALL)
-            ? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter;
+            ? new JavaUtils.IdPathFilter(writeId, stmtId, false, true) : 
filter;
         }
         else if(!isAcidIUDoperation && isFullAcidTable) {
-          destPath = fixFullAcidPathForLoadData(loadFileType, destPath, txnId, 
stmtId, tbl);
+          destPath = fixFullAcidPathForLoadData(loadFileType, destPath, 
writeId, stmtId, tbl);
         }
         if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
           Utilities.FILE_OP_LOGGER.trace("moving " + loadPath + " to " + 
destPath);
@@ -1723,7 +1727,7 @@ public class Hive {
    * delta_x_x directory - same as any other Acid write.  This method modifies 
the destPath to add
    * this path component.
    * @param txnId - id of current transaction (in which this operation is 
running)
-   * @param stmtId - see {@link DbTxnManager#getWriteIdAndIncrement()}
+   * @param stmtId - see {@link DbTxnManager#getStmtIdAndIncrement()}
    * @return appropriately modified path
    */
   private Path fixFullAcidPathForLoadData(LoadFileType loadFileType, Path 
destPath, long txnId, int stmtId, Table tbl) throws HiveException {
@@ -1987,13 +1991,13 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @param loadFileType
    * @param numDP number of dynamic partitions
    * @param isAcid true if this is an ACID operation
-   * @param txnId txnId, can be 0 unless isAcid == true
+   * @param writeId writeId, can be 0 unless isAcid == true
    * @return partition map details (PartitionSpec and Partition)
    * @throws HiveException
    */
   public Map<Map<String, String>, Partition> loadDynamicPartitions(final Path 
loadPath,
       final String tableName, final Map<String, String> partSpec, final 
LoadFileType loadFileType,
-      final int numDP, final int numLB, final boolean isAcid, final long 
txnId, final int stmtId,
+      final int numDP, final int numLB, final boolean isAcid, final long 
writeId, final int stmtId,
       final boolean hasFollowingStatsTask, final AcidUtils.Operation 
operation, boolean isInsertOverwrite)
       throws HiveException {
 
@@ -2009,7 +2013,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
     // Get all valid partition paths and existing partitions for them (if any)
     final Table tbl = getTable(tableName);
-    final Set<Path> validPartitions = getValidPartitionsInPath(numDP, numLB, 
loadPath, txnId, stmtId,
+    final Set<Path> validPartitions = getValidPartitionsInPath(numDP, numLB, 
loadPath, writeId, stmtId,
         AcidUtils.isInsertOnlyTable(tbl.getParameters()), isInsertOverwrite);
 
     final int partsToLoad = validPartitions.size();
@@ -2044,7 +2048,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
               // load the partition
               Partition newPartition = loadPartition(partPath, tbl, 
fullPartSpec,
                   loadFileType, true, numLB > 0,
-                  false, isAcid, hasFollowingStatsTask, txnId, stmtId);
+                  false, isAcid, hasFollowingStatsTask, writeId, stmtId);
               partitionsMap.put(fullPartSpec, newPartition);
 
               if (inPlaceEligible) {
@@ -2103,8 +2107,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
         for (Partition p : partitionsMap.values()) {
           partNames.add(p.getName());
         }
-        getMSC().addDynamicPartitions(txnId, tbl.getDbName(), 
tbl.getTableName(),
-          partNames, AcidUtils.toDataOperationType(operation));
+        
getMSC().addDynamicPartitions(parentSession.getTxnMgr().getCurrentTxnId(), 
writeId,
+                tbl.getDbName(), tbl.getTableName(), partNames,
+                AcidUtils.toDataOperationType(operation));
       }
       LOG.info("Loaded " + partitionsMap.size() + " partitions");
       return partitionsMap;
@@ -2134,10 +2139,12 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @param hasFollowingStatsTask
    *          if there is any following stats task
    * @param isAcidIUDoperation true if this is an ACID based Insert 
[overwrite]/update/delete
+   * @param writeId write ID allocated for the current load operation
+   * @param stmtId statement ID of the current load statement
    */
   public void loadTable(Path loadPath, String tableName, LoadFileType 
loadFileType, boolean isSrcLocal,
       boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean 
hasFollowingStatsTask,
-      Long txnId, int stmtId) throws HiveException {
+      Long writeId, int stmtId) throws HiveException {
     List<Path> newFiles = null;
     Table tbl = getTable(tableName);
     assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();
@@ -2150,7 +2157,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     // Note: this assumes both paths are qualified; which they are, currently.
     if (isMmTable && loadPath.equals(tbl.getPath())) {
       Utilities.FILE_OP_LOGGER.debug("not moving " + loadPath + " to " + 
tbl.getPath());
-      newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId);
+      newFiles = listFilesCreatedByQuery(loadPath, writeId, stmtId);
     } else {
       // Either a non-MM query, or a load into MM table from an external 
source.
       Path tblPath = tbl.getPath();
@@ -2159,12 +2166,12 @@ private void constructOneLBLocationMap(FileStatus fSta,
       if (isMmTable) {
         assert !isAcidIUDoperation;
         // We will load into MM directory, and delete from the parent if 
needed.
-        destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, 
stmtId));
+        destPath = new Path(destPath, AcidUtils.deltaSubdir(writeId, writeId, 
stmtId));
         filter = loadFileType == LoadFileType.REPLACE_ALL
-            ? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter;
+            ? new JavaUtils.IdPathFilter(writeId, stmtId, false, true) : 
filter;
       }
       else if(!isAcidIUDoperation && isFullAcidTable) {
-        destPath = fixFullAcidPathForLoadData(loadFileType, destPath, txnId, 
stmtId, tbl);
+        destPath = fixFullAcidPathForLoadData(loadFileType, destPath, writeId, 
stmtId, tbl);
       }
       Utilities.FILE_OP_LOGGER.debug("moving " + loadPath + " to " + tblPath
           + " (replace = " + loadFileType + ")");

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 3023144..4f396a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -1270,8 +1270,8 @@ public final class GenMapRedUtils {
     FileSinkDesc fsInputDesc = fsInput.getConf();
     if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
       Utilities.FILE_OP_LOGGER.trace("Creating merge work from " + 
System.identityHashCode(fsInput)
-        + " with write ID " + (fsInputDesc.isMmTable() ? 
fsInputDesc.getTransactionId() : null)
-        + " into " + finalName);
+          + " with write ID " + (fsInputDesc.isMmTable() ? 
fsInputDesc.getTableWriteId() : null)
+          + " into " + finalName);
     }
 
     boolean isBlockMerge = 
(conf.getBoolVar(ConfVars.HIVEMERGERCFILEBLOCKLEVEL) &&
@@ -1280,7 +1280,7 @@ public final class GenMapRedUtils {
             
fsInputDesc.getTableInfo().getInputFileFormatClass().equals(OrcInputFormat.class));
 
     RowSchema inputRS = fsInput.getSchema();
-    Long srcMmWriteId = fsInputDesc.isMmTable() ? 
fsInputDesc.getTransactionId() : null;
+    Long srcMmWriteId = fsInputDesc.isMmTable() ? 
fsInputDesc.getTableWriteId() : null;
     FileSinkDesc fsOutputDesc = null;
     TableScanOperator tsMerge = null;
     if (!isBlockMerge) {
@@ -1665,7 +1665,7 @@ public final class GenMapRedUtils {
       fmd = new OrcFileMergeDesc();
     }
     fmd.setIsMmTable(fsInputDesc.isMmTable());
-    fmd.setTxnId(fsInputDesc.getTransactionId());
+    fmd.setWriteId(fsInputDesc.getTableWriteId());
     int stmtId = fsInputDesc.getStatementId();
     fmd.setStmtId(stmtId == -1 ? 0 : stmtId);
     fmd.setDpCtx(fsInputDesc.getDynPartCtx());

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index 718faff..e926b63 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -1563,7 +1563,7 @@ public class DDLSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         Path queryTmpdir = ctx.getExternalTmpPath(newTblPartLoc);
         truncateTblDesc.setOutputDir(queryTmpdir);
         LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc,
-            partSpec == null ? new HashMap<>() : partSpec, null);
+            partSpec == null ? new HashMap<>() : partSpec);
         ltd.setLbCtx(lbCtx);
         @SuppressWarnings("unchecked")
         Task<MoveWork> moveTsk =
@@ -2017,7 +2017,7 @@ public class DDLSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       mergeDesc.setOutputDir(queryTmpdir);
       // No need to handle MM tables - unsupported path.
       LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc,
-          partSpec == null ? new HashMap<>() : partSpec, null);
+          partSpec == null ? new HashMap<>() : partSpec);
       ltd.setLbCtx(lbCtx);
       Task<MoveWork> moveTsk =
           TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf);

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 5520bc2..67d05e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -318,31 +318,39 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     Table table = tableIfExists(tblDesc, x.getHive());
     boolean tableExists = false;
 
-    if (table != null){
+    if (table != null) {
       checkTable(table, tblDesc,replicationSpec, x.getConf());
       x.getLOG().debug("table " + tblDesc.getTableName() + " exists: metadata 
checked");
       tableExists = true;
     }
 
-    Long txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
+    Long writeId = 0L; // Initialize with 0 for non-ACID and non-MM tables.
+    if (((table != null) && AcidUtils.isTransactionalTable(table))
+            || AcidUtils.isTablePropertyTransactional(tblDesc.getTblProps())) {
+      // Explain plan doesn't open a txn and hence no need to allocate write 
id.
+      if (x.getCtx().getExplainConfig() == null) {
+        writeId = 
SessionState.get().getTxnMgr().getTableWriteId(tblDesc.getDatabaseName(), 
tblDesc.getTableName());
+      }
+    }
     int stmtId = 0;
+
     // TODO [MM gap?]: bad merge; tblDesc is no longer CreateTableDesc, but 
ImportTableDesc.
     //                 We need to verify the tests to see if this works 
correctly.
     /*
-    if (isAcid(txnId)) {
-      tblDesc.setInitialMmWriteId(txnId);
+    if (isAcid(writeId)) {
+      tblDesc.setInitialMmWriteId(writeId);
     }
     */
     if (!replicationSpec.isInReplicationScope()) {
       createRegularImportTasks(
           tblDesc, partitionDescs,
           isPartSpecSet, replicationSpec, table,
-          fromURI, fs, wh, x, txnId, stmtId, isSourceMm);
+          fromURI, fs, wh, x, writeId, stmtId, isSourceMm);
     } else {
       createReplImportTasks(
           tblDesc, partitionDescs,
           replicationSpec, waitOnPrecursor, table,
-          fromURI, fs, wh, x, txnId, stmtId, isSourceMm, updatedMetadata);
+          fromURI, fs, wh, x, writeId, stmtId, isSourceMm, updatedMetadata);
     }
     return tableExists;
   }
@@ -377,13 +385,13 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
   private static Task<?> loadTable(URI fromURI, Table table, boolean replace, 
Path tgtPath,
       ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext 
x,
-      Long txnId, int stmtId, boolean isSourceMm) {
+      Long writeId, int stmtId, boolean isSourceMm) {
     Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME);
     Path destPath = null, loadPath = null;
     LoadFileType lft;
     if (AcidUtils.isInsertOnlyTable(table)) {
-      String mmSubdir = replace ? AcidUtils.baseDir(txnId)
-          : AcidUtils.deltaSubdir(txnId, txnId, stmtId);
+      String mmSubdir = replace ? AcidUtils.baseDir(writeId)
+          : AcidUtils.deltaSubdir(writeId, writeId, stmtId);
       destPath = new Path(tgtPath, mmSubdir);
       loadPath = tgtPath;
       lft = LoadFileType.KEEP_EXISTING;
@@ -395,13 +403,13 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
     if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
       Utilities.FILE_OP_LOGGER.trace("adding import work for table with source 
location: " +
-        dataPath + "; table: " + tgtPath + "; copy destination " + destPath + 
"; mm " + txnId +
+        dataPath + "; table: " + tgtPath + "; copy destination " + destPath + 
"; mm " + writeId +
         " (src " + isSourceMm + ") for " + (table == null ? "a new table" : 
table.getTableName()));
     }
 
     Task<?> copyTask = null;
     if (replicationSpec.isInReplicationScope()) {
-      if (isSourceMm || isAcid(txnId)) {
+      if (isSourceMm || isAcid(writeId)) {
         // Note: this is replication gap, not MM gap... Repl V2 is not ready 
yet.
         throw new RuntimeException("Replicating MM and ACID tables is not 
supported");
       }
@@ -413,7 +421,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     }
 
     LoadTableDesc loadTableWork = new LoadTableDesc(
-        loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft, txnId);
+        loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft, 
writeId);
     loadTableWork.setStmtId(stmtId);
     MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, 
null, false);
     Task<?> loadTableTask = TaskFactory.get(mv, x.getConf());
@@ -423,11 +431,11 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
   }
 
   /**
-   * todo: this is odd: transactions are opened for all statements.  what is 
this supposed to check?
+   * todo: this is odd: write id allocated for all write operations on ACID 
tables.  what is this supposed to check?
    */
   @Deprecated
-  private static boolean isAcid(Long txnId) {
-    return (txnId != null) && (txnId != 0);
+  private static boolean isAcid(Long writeId) {
+    return (writeId != null) && (writeId != 0);
   }
 
   private static Task<?> createTableTask(ImportTableDesc tableDesc, 
EximUtil.SemanticAnalyzerWrapperContext x){
@@ -467,7 +475,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
  private static Task<?> addSinglePartition(URI fromURI, FileSystem fs, 
ImportTableDesc tblDesc,
       Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, 
ReplicationSpec replicationSpec,
-      EximUtil.SemanticAnalyzerWrapperContext x, Long txnId, int stmtId, 
boolean isSourceMm,
+      EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, 
boolean isSourceMm,
       Task<?> commitTask)
       throws MetaException, IOException, HiveException {
     AddPartitionDesc.OnePartitionDesc partSpec = 
addPartitionDesc.getPartition(0);
@@ -487,18 +495,18 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
           + " with source location: " + srcLocation);
       Path tgtLocation = new Path(partSpec.getLocation());
       Path destPath = !AcidUtils.isInsertOnlyTable(table.getParameters()) ? 
x.getCtx().getExternalTmpPath(tgtLocation)
-          : new Path(tgtLocation, AcidUtils.deltaSubdir(txnId, txnId, stmtId));
+          : new Path(tgtLocation, AcidUtils.deltaSubdir(writeId, writeId, 
stmtId));
       Path moveTaskSrc =  !AcidUtils.isInsertOnlyTable(table.getParameters()) 
? destPath : tgtLocation;
       if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
         Utilities.FILE_OP_LOGGER.trace("adding import work for partition with 
source location: "
           + srcLocation + "; target: " + tgtLocation + "; copy dest " + 
destPath + "; mm "
-          + txnId + " (src " + isSourceMm + ") for " + 
partSpecToString(partSpec.getPartSpec()));
+          + writeId + " (src " + isSourceMm + ") for " + 
partSpecToString(partSpec.getPartSpec()));
       }
 
 
       Task<?> copyTask = null;
       if (replicationSpec.isInReplicationScope()) {
-        if (isSourceMm || isAcid(txnId)) {
+        if (isSourceMm || isAcid(writeId)) {
           // Note: this is replication gap, not MM gap... Repl V2 is not ready 
yet.
           throw new RuntimeException("Replicating MM and ACID tables is not 
supported");
         }
@@ -515,7 +523,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, 
Utilities.getTableDesc(table),
           partSpec.getPartSpec(),
           replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : 
LoadFileType.OVERWRITE_EXISTING,
-          txnId);
+              writeId);
       loadTableWork.setStmtId(stmtId);
       loadTableWork.setInheritTableSpecs(false);
       Task<?> loadPartTask = TaskFactory.get(new MoveWork(
@@ -814,14 +822,14 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
   private static void createRegularImportTasks(
       ImportTableDesc tblDesc, List<AddPartitionDesc> partitionDescs, boolean 
isPartSpecSet,
       ReplicationSpec replicationSpec, Table table, URI fromURI, FileSystem 
fs, Warehouse wh,
-      EximUtil.SemanticAnalyzerWrapperContext x, Long txnId, int stmtId, 
boolean isSourceMm)
+      EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, 
boolean isSourceMm)
       throws HiveException, URISyntaxException, IOException, MetaException {
 
     if (table != null) {
       if (table.isPartitioned()) {
         x.getLOG().debug("table partitioned");
         Task<?> ict = createImportCommitTask(
-            table.getDbName(), table.getTableName(), txnId, stmtId, 
x.getConf(),
+            table.getDbName(), table.getTableName(), writeId, stmtId, 
x.getConf(),
             AcidUtils.isInsertOnlyTable(table.getParameters()));
 
         for (AddPartitionDesc addPartitionDesc : partitionDescs) {
@@ -829,7 +837,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
           org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
           if ((ptn = x.getHive().getPartition(table, partSpec, false)) == 
null) {
             x.getTasks().add(addSinglePartition(
-                fromURI, fs, tblDesc, table, wh, addPartitionDesc, 
replicationSpec, x, txnId, stmtId, isSourceMm, ict));
+                fromURI, fs, tblDesc, table, wh, addPartitionDesc, 
replicationSpec, x, writeId, stmtId, isSourceMm, ict));
           } else {
             throw new SemanticException(
                 ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec)));
@@ -841,7 +849,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         Path tgtPath = new Path(table.getDataLocation().toString());
         FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf());
         checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x.getLOG());
-        loadTable(fromURI, table, false, tgtPath, replicationSpec, x, txnId, 
stmtId, isSourceMm);
+        loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, 
stmtId, isSourceMm);
       }
       // Set this to read because we can't overwrite any existing partitions
       x.getOutputs().add(new WriteEntity(table, 
WriteEntity.WriteType.DDL_NO_LOCK));
@@ -858,11 +866,11 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
       if (isPartitioned(tblDesc)) {
         Task<?> ict = createImportCommitTask(
-            tblDesc.getDatabaseName(), tblDesc.getTableName(), txnId, stmtId, 
x.getConf(),
+            tblDesc.getDatabaseName(), tblDesc.getTableName(), writeId, 
stmtId, x.getConf(),
             AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()));
         for (AddPartitionDesc addPartitionDesc : partitionDescs) {
           t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, 
wh, addPartitionDesc,
-            replicationSpec, x, txnId, stmtId, isSourceMm, ict));
+            replicationSpec, x, writeId, stmtId, isSourceMm, ict));
         }
       } else {
         x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
@@ -885,7 +893,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
             tblproperties.put("transactional_properties", "insert_only");
             table.setParameters(tblproperties);
           }
-          t.addDependentTask(loadTable(fromURI, table, false, tablePath, 
replicationSpec, x, txnId, stmtId, isSourceMm));
+          t.addDependentTask(loadTable(fromURI, table, false, tablePath, 
replicationSpec, x, writeId, stmtId, isSourceMm));
         }
       }
       x.getTasks().add(t);
@@ -893,11 +901,11 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
   }
 
   private static Task<?> createImportCommitTask(
-      String dbName, String tblName, Long txnId, int stmtId, HiveConf conf, 
boolean isMmTable) {
+      String dbName, String tblName, Long writeId, int stmtId, HiveConf conf, 
boolean isMmTable) {
     // TODO: noop, remove?
     @SuppressWarnings("unchecked")
     Task<ImportCommitWork> ict = (!isMmTable) ? null : TaskFactory.get(
-        new ImportCommitWork(dbName, tblName, txnId, stmtId), conf);
+        new ImportCommitWork(dbName, tblName, writeId, stmtId), conf);
     return ict;
   }
 
@@ -909,7 +917,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       List<AddPartitionDesc> partitionDescs,
       ReplicationSpec replicationSpec, boolean waitOnPrecursor,
       Table table, URI fromURI, FileSystem fs, Warehouse wh,
-      EximUtil.SemanticAnalyzerWrapperContext x, Long txnId, int stmtId, 
boolean isSourceMm,
+      EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, 
boolean isSourceMm,
       UpdatedMetaDataTracker updatedMetadata)
       throws HiveException, URISyntaxException, IOException, MetaException {
 
@@ -986,19 +994,19 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       if (!replicationSpec.isMetadataOnly()) {
         if (isPartitioned(tblDesc)) {
           Task<?> ict = createImportCommitTask(
-              tblDesc.getDatabaseName(), tblDesc.getTableName(), txnId, 
stmtId, x.getConf(),
+              tblDesc.getDatabaseName(), tblDesc.getTableName(), writeId, 
stmtId, x.getConf(),
               AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()));
           for (AddPartitionDesc addPartitionDesc : partitionDescs) {
             addPartitionDesc.setReplicationSpec(replicationSpec);
             t.addDependentTask(
-                addSinglePartition(fromURI, fs, tblDesc, table, wh, 
addPartitionDesc, replicationSpec, x, txnId, stmtId, isSourceMm, ict));
+                addSinglePartition(fromURI, fs, tblDesc, table, wh, 
addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm, ict));
             if (updatedMetadata != null) {
               
updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
             }
           }
         } else {
           x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
-          t.addDependentTask(loadTable(fromURI, table, true, new 
Path(tblDesc.getLocation()), replicationSpec, x, txnId, stmtId, isSourceMm));
+          t.addDependentTask(loadTable(fromURI, table, true, new 
Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId, isSourceMm));
         }
       }
       // Simply create
@@ -1012,12 +1020,12 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
           Map<String, String> partSpec = 
addPartitionDesc.getPartition(0).getPartSpec();
           org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
           Task<?> ict = replicationSpec.isMetadataOnly() ? null : 
createImportCommitTask(
-              tblDesc.getDatabaseName(), tblDesc.getTableName(), txnId, 
stmtId, x.getConf(),
+              tblDesc.getDatabaseName(), tblDesc.getTableName(), writeId, 
stmtId, x.getConf(),
               AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()));
           if ((ptn = x.getHive().getPartition(table, partSpec, false)) == 
null) {
             if (!replicationSpec.isMetadataOnly()){
               x.getTasks().add(addSinglePartition(
-                  fromURI, fs, tblDesc, table, wh, addPartitionDesc, 
replicationSpec, x, txnId, stmtId, isSourceMm, ict));
+                  fromURI, fs, tblDesc, table, wh, addPartitionDesc, 
replicationSpec, x, writeId, stmtId, isSourceMm, ict));
               if (updatedMetadata != null) {
                 
updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
               }
@@ -1034,7 +1042,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
             if (replicationSpec.allowReplacementInto(ptn.getParameters())){
               if (!replicationSpec.isMetadataOnly()){
                 x.getTasks().add(addSinglePartition(
-                    fromURI, fs, tblDesc, table, wh, addPartitionDesc, 
replicationSpec, x, txnId, stmtId, isSourceMm, ict));
+                    fromURI, fs, tblDesc, table, wh, addPartitionDesc, 
replicationSpec, x, writeId, stmtId, isSourceMm, ict));
               } else {
                 x.getTasks().add(alterSinglePartition(
                     fromURI, fs, tblDesc, table, wh, addPartitionDesc, 
replicationSpec, ptn, x));
@@ -1060,7 +1068,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         if (!replicationSpec.isMetadataOnly()) {
           // repl-imports are replace-into unless the event is insert-into
           loadTable(fromURI, table, replicationSpec.isReplace(), new 
Path(fromURI),
-            replicationSpec, x, txnId, stmtId, isSourceMm);
+            replicationSpec, x, writeId, stmtId, isSourceMm);
         } else {
           x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec));
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
index cc66936..7d2de75 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -316,17 +317,22 @@ public class LoadSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       }
     }
 
-    Long txnId = null;
+    Long writeId = null;
     int stmtId = -1;
     if (AcidUtils.isTransactionalTable(ts.tableHandle)) {
-      txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
-      stmtId = SessionState.get().getTxnMgr().getWriteIdAndIncrement();
+      try {
+        writeId = 
SessionState.get().getTxnMgr().getTableWriteId(ts.tableHandle.getDbName(),
+                ts.tableHandle.getTableName());
+      } catch (LockException ex) {
+        throw new SemanticException("Failed to allocate the write id", ex);
+      }
+      stmtId = SessionState.get().getTxnMgr().getStmtIdAndIncrement();
     }
 
     LoadTableDesc loadTableWork;
     loadTableWork = new LoadTableDesc(new Path(fromURI),
       Utilities.getTableDesc(ts.tableHandle), partSpec,
-      isOverWrite ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING, 
txnId);
+      isOverWrite ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING, 
writeId);
     loadTableWork.setStmtId(stmtId);
     if (preservePartitionSpecs){
       // Note : preservePartitionSpecs=true implies inheritTableSpecs=false but

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 19fc6a9..cd6f1ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -131,6 +131,8 @@ import org.apache.hadoop.hive.ql.lib.Dispatcher;
 import org.apache.hadoop.hive.ql.lib.GraphWalker;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.metadata.DummyPartition;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -6801,7 +6803,8 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
     ListBucketingCtx lbCtx = null;
     Map<String, String> partSpec = null;
     boolean isMmTable = false, isMmCtas = false;
-    Long txnId = null;
+    Long writeId = null;
+    HiveTxnManager txnMgr = SessionState.get().getTxnMgr();
 
     switch (dest_type.intValue()) {
     case QBMetaData.DEST_TABLE: {
@@ -6884,15 +6887,23 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
           //todo: should this be done for MM?  is it ok to use 
CombineHiveInputFormat with MM
           checkAcidConstraints(qb, table_desc, dest_tab);
         }
-        if (isMmTable) {
-          txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
-        } else {
-          txnId = acidOp == Operation.NOT_ACID ? null :
-              SessionState.get().getTxnMgr().getCurrentTxnId();
+        try {
+          if (ctx.getExplainConfig() != null) {
+            writeId = 0L; // For explain plan, txn won't be opened and doesn't 
make sense to allocate write id
+          } else {
+            if (isMmTable) {
+              writeId = txnMgr.getTableWriteId(dest_tab.getDbName(), 
dest_tab.getTableName());
+            } else {
+              writeId = acidOp == Operation.NOT_ACID ? null :
+                      txnMgr.getTableWriteId(dest_tab.getDbName(), 
dest_tab.getTableName());
+            }
+          }
+        } catch (LockException ex) {
+          throw new SemanticException("Failed to allocate write Id", ex);
         }
         boolean isReplace = !qb.getParseInfo().isInsertIntoTable(
             dest_tab.getDbName(), dest_tab.getTableName());
-        ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, 
isReplace, txnId);
+        ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, 
isReplace, writeId);
         // For Acid table, Insert Overwrite shouldn't replace the table 
content. We keep the old
         // deltas and base and leave them up to the cleaner to clean up
         LoadFileType loadType = 
(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
@@ -6967,13 +6978,21 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
         //todo: should this be done for MM?  is it ok to use 
CombineHiveInputFormat with MM?
         checkAcidConstraints(qb, table_desc, dest_tab);
       }
-      if (isMmTable) {
-        txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
-      } else {
-        txnId = (acidOp == Operation.NOT_ACID) ? null :
-            SessionState.get().getTxnMgr().getCurrentTxnId();
+      try {
+        if (ctx.getExplainConfig() != null) {
+          writeId = 0L; // For explain plan, txn won't be opened and doesn't 
make sense to allocate write id
+        } else {
+          if (isMmTable) {
+            writeId = txnMgr.getTableWriteId(dest_tab.getDbName(), 
dest_tab.getTableName());
+          } else {
+            writeId = (acidOp == Operation.NOT_ACID) ? null :
+                    txnMgr.getTableWriteId(dest_tab.getDbName(), 
dest_tab.getTableName());
+          }
+        }
+      } catch (LockException ex) {
+        throw new SemanticException("Failed to allocate write Id", ex);
       }
-      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), 
acidOp, txnId);
+      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), 
acidOp, writeId);
       // For Acid table, Insert Overwrite shouldn't replace the table content. 
We keep the old
       // deltas and base and leave them up to the cleaner to clean up
       LoadFileType loadType = 
(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
@@ -7011,8 +7030,16 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
         destTableIsMaterialization = tblDesc.isMaterialization();
         if (AcidUtils.isInsertOnlyTable(tblDesc.getTblProps(), true)) {
           isMmTable = isMmCtas = true;
-          txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
-          tblDesc.setInitialMmWriteId(txnId);
+          try {
+            if (ctx.getExplainConfig() != null) {
+              writeId = 0L; // For explain plan, txn won't be opened and 
doesn't make sense to allocate write id
+            } else {
+              writeId = txnMgr.getTableWriteId(tblDesc.getDatabaseName(), 
tblDesc.getTableName());
+            }
+          } catch (LockException ex) {
+            throw new SemanticException("Failed to allocate write Id", ex);
+          }
+          tblDesc.setInitialMmWriteId(writeId);
         }
       } else if (viewDesc != null) {
         field_schemas = new ArrayList<FieldSchema>();
@@ -7156,7 +7183,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
     FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, table_desc, dest_part,
         dest_path, currentTableId, destTableIsFullAcid, 
destTableIsTemporary,//this was 1/4 acid
         destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS,
-        canBeMerged, dest_tab, txnId, isMmCtas, dest_type, qb);
+        canBeMerged, dest_tab, writeId, isMmCtas, dest_type, qb);
     if (isMmCtas) {
       // Add FSD so that the LoadTask compilation could fix up its path to 
avoid the move.
       tableDesc.setWriter(fileSinkDesc);

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java
index 4da868c..6bd0053 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java
@@ -28,7 +28,7 @@ public class FileMergeDesc extends AbstractOperatorDesc {
   private int listBucketingDepth;
   private boolean hasDynamicPartitions;
   private boolean isListBucketingAlterTableConcatenate;
-  private Long txnId;
+  private Long writeId;
   private int stmtId;
   private boolean isMmTable;
 
@@ -77,12 +77,12 @@ public class FileMergeDesc extends AbstractOperatorDesc {
     this.isListBucketingAlterTableConcatenate = 
isListBucketingAlterTableConcatenate;
   }
 
-  public Long getTxnId() {
-    return txnId;
+  public Long getWriteId() {
+    return writeId;
   }
 
-  public void setTxnId(Long txnId) {
-    this.txnId = txnId;
+  public void setWriteId(Long writeId) {
+    this.writeId = writeId;
   }
 
   public int getStmtId() {

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index 92b8031..ce61fc5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -90,7 +90,7 @@ public class FileSinkDesc extends AbstractOperatorDesc 
implements IStatsGatherDe
 
   // Record what type of write this is.  Default is non-ACID (ie old style).
   private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID;
-  private long txnId = 0;  // transaction id for this operation
+  private long tableWriteId = 0;  // table write id for this operation
   private int statementId = -1;
 
   private transient Table table;
@@ -167,7 +167,7 @@ public class FileSinkDesc extends AbstractOperatorDesc 
implements IStatsGatherDe
     ret.setStatsReliable(statsReliable);
     ret.setDpSortState(dpSortState);
     ret.setWriteType(writeType);
-    ret.setTransactionId(txnId);
+    ret.setTableWriteId(tableWriteId);
     ret.setStatementId(statementId);
     ret.setStatsTmpDir(statsTmpDir);
     ret.setIsMerge(isMerge);
@@ -207,7 +207,7 @@ public class FileSinkDesc extends AbstractOperatorDesc 
implements IStatsGatherDe
   public Path getMergeInputDirName() {
     Path root = getFinalDirName();
     if (isMmTable()) {
-      return new Path(root, AcidUtils.deltaSubdir(txnId, txnId, statementId));
+      return new Path(root, AcidUtils.deltaSubdir(tableWriteId, tableWriteId, 
statementId));
     } else {
       return root;
     }
@@ -483,11 +483,11 @@ public class FileSinkDesc extends AbstractOperatorDesc 
implements IStatsGatherDe
   public String getWriteTypeString() {
     return getWriteType() == AcidUtils.Operation.NOT_ACID ? null : 
getWriteType().toString();
   }
-  public void setTransactionId(long id) {
-    txnId = id;
+  public void setTableWriteId(long id) {
+    tableWriteId = id;
   }
-  public long getTransactionId() {
-    return txnId;
+  public long getTableWriteId() {
+    return tableWriteId;
   }
 
   public void setStatementId(int id) {

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
index a40c486..f15b3c3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
@@ -39,7 +39,7 @@ public class LoadTableDesc extends LoadDesc implements 
Serializable {
   private boolean inheritTableSpecs = true; //For partitions, flag controlling 
whether the current
                                             //table specs are to be used
   private int stmtId;
-  private Long currentTransactionId;
+  private Long currentWriteId;
   private boolean isInsertOverwrite;
 
   // TODO: the below seem like they should just be combined into partitionDesc
@@ -71,7 +71,7 @@ public class LoadTableDesc extends LoadDesc implements 
Serializable {
     this.dpCtx = o.dpCtx;
     this.lbCtx = o.lbCtx;
     this.inheritTableSpecs = o.inheritTableSpecs;
-    this.currentTransactionId = o.currentTransactionId;
+    this.currentWriteId = o.currentWriteId;
     this.table = o.table;
     this.partitionSpec = o.partitionSpec;
   }
@@ -80,13 +80,13 @@ public class LoadTableDesc extends LoadDesc implements 
Serializable {
       final TableDesc table,
       final Map<String, String> partitionSpec,
       final LoadFileType loadFileType,
-      final AcidUtils.Operation writeType, Long currentTransactionId) {
+      final AcidUtils.Operation writeType, Long currentWriteId) {
     super(sourcePath, writeType);
     if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
       Utilities.FILE_OP_LOGGER.trace("creating part LTD from " + sourcePath + 
" to "
         + ((table.getProperties() == null) ? "null" : table.getTableName()));
     }
-    init(table, partitionSpec, loadFileType, currentTransactionId);
+    init(table, partitionSpec, loadFileType, currentWriteId);
   }
 
   /**
@@ -95,21 +95,22 @@ public class LoadTableDesc extends LoadDesc implements 
Serializable {
    * @param table
    * @param partitionSpec
    * @param loadFileType
+   * @param writeId
    */
   public LoadTableDesc(final Path sourcePath,
                        final TableDesc table,
                        final Map<String, String> partitionSpec,
                        final LoadFileType loadFileType,
-                       final Long txnId) {
-    this(sourcePath, table, partitionSpec, loadFileType, 
AcidUtils.Operation.NOT_ACID, txnId);
+                       final Long writeId) {
+    this(sourcePath, table, partitionSpec, loadFileType, 
AcidUtils.Operation.NOT_ACID, writeId);
   }
 
   public LoadTableDesc(final Path sourcePath,
       final TableDesc table,
       final Map<String, String> partitionSpec,
-      final AcidUtils.Operation writeType, Long currentTransactionId) {
+      final AcidUtils.Operation writeType, Long currentWriteId) {
     this(sourcePath, table, partitionSpec, LoadFileType.REPLACE_ALL,
-            writeType, currentTransactionId);
+            writeType, currentWriteId);
   }
 
   /**
@@ -120,16 +121,16 @@ public class LoadTableDesc extends LoadDesc implements 
Serializable {
    */
   public LoadTableDesc(final Path sourcePath,
                        final org.apache.hadoop.hive.ql.plan.TableDesc table,
-                       final Map<String, String> partitionSpec, Long txnId) {
+                       final Map<String, String> partitionSpec) {
     this(sourcePath, table, partitionSpec, LoadFileType.REPLACE_ALL,
-      AcidUtils.Operation.NOT_ACID, txnId);
+      AcidUtils.Operation.NOT_ACID, null);
   }
 
   public LoadTableDesc(final Path sourcePath,
       final TableDesc table,
       final DynamicPartitionCtx dpCtx,
       final AcidUtils.Operation writeType,
-      boolean isReplace, Long txnId) {
+      boolean isReplace, Long writeId) {
     super(sourcePath, writeType);
     if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
       Utilities.FILE_OP_LOGGER.trace("creating LTD from " + sourcePath + " to 
" + table.getTableName());
@@ -137,9 +138,9 @@ public class LoadTableDesc extends LoadDesc implements 
Serializable {
     this.dpCtx = dpCtx;
     LoadFileType lft = isReplace ?  LoadFileType.REPLACE_ALL :  
LoadFileType.OVERWRITE_EXISTING;
     if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) 
{
-      init(table, dpCtx.getPartSpec(), lft, txnId);
+      init(table, dpCtx.getPartSpec(), lft, writeId);
     } else {
-      init(table, new LinkedHashMap<String, String>(), lft, txnId);
+      init(table, new LinkedHashMap<String, String>(), lft, writeId);
     }
   }
 
@@ -147,11 +148,11 @@ public class LoadTableDesc extends LoadDesc implements 
Serializable {
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
       final Map<String, String> partitionSpec,
       final LoadFileType loadFileType,
-      Long txnId) {
+      Long writeId) {
     this.table = table;
     this.partitionSpec = partitionSpec;
     this.loadFileType = loadFileType;
-    this.currentTransactionId = txnId;
+    this.currentWriteId = writeId;
   }
 
   @Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, 
Level.EXTENDED })
@@ -232,8 +233,8 @@ public class LoadTableDesc extends LoadDesc implements 
Serializable {
     this.lbCtx = lbCtx;
   }
 
-  public long getTxnId() {
-    return currentTransactionId == null ? 0 : currentTransactionId;
+  public long getWriteId() {
+    return currentWriteId == null ? 0 : currentWriteId;
   }
 
   public int getStmtId() {

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index 50ceba5..59968fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -102,6 +102,9 @@ public class TableScanDesc extends AbstractOperatorDesc 
implements IStatsGatherD
   // input file name (big) to bucket number
   private Map<String, Integer> bucketFileNameMapping;
 
+  private String dbName = null;
+  private String tableName = null;
+
   private boolean isMetadataOnly = false;
 
   private boolean isTranscationalTable;
@@ -135,6 +138,11 @@ public class TableScanDesc extends AbstractOperatorDesc 
implements IStatsGatherD
     this.alias = alias;
     this.virtualCols = vcs;
     this.tableMetadata = tblMetadata;
+
+    if (tblMetadata != null) {
+      dbName = tblMetadata.getDbName();
+      tableName = tblMetadata.getTableName();
+    }
     isTranscationalTable = AcidUtils.isTransactionalTable(this.tableMetadata);
     if (isTranscationalTable) {
       acidOperationalProperties = 
AcidUtils.getAcidOperationalProperties(this.tableMetadata);
@@ -154,12 +162,12 @@ public class TableScanDesc extends AbstractOperatorDesc 
implements IStatsGatherD
 
   @Explain(displayName = "table", jsonOnly = true)
   public String getTableName() {
-    return this.tableMetadata.getTableName();
+    return this.tableName;
   }
 
   @Explain(displayName = "database", jsonOnly = true)
   public String getDatabaseName() {
-    return this.tableMetadata.getDbName();
+    return this.dbName;
   }
 
   @Explain(displayName = "columns", jsonOnly = true)

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java
index 85917e4..7591c06 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
-import org.apache.hadoop.hive.ql.stats.ColumnStatisticsObjTranslator;
 import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;

http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 02097c8..31f50fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -23,8 +23,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -243,10 +243,10 @@ public class Cleaner extends CompactorThread {
 
       /**
        * Each Compaction only compacts as far as the highest txn id such that 
all txns below it
-       * are resolved (i.e. not opened).  This is what "highestTxnId" tracks.  
This is only tracked
-       * since Hive 1.3.0/2.0 - thus may be 0.  See ValidCompactorTxnList and 
uses for more info.
+       * are resolved (i.e. not opened).  This is what "highestWriteId" 
tracks.  This is only tracked
+       * since Hive 1.3.0/2.0 - thus may be 0.  See ValidCompactorWriteIdList 
and uses for more info.
        * 
-       * We only want to clean up to the highestTxnId - otherwise we risk 
deleteing deltas from
+       * We only want to clean up to the highestWriteId - otherwise we risk 
deleteing deltas from
        * under an active reader.
        * 
        * Suppose we have deltas D2 D3 for table T, i.e. the last compaction 
created D3 so now there is a 
@@ -255,10 +255,11 @@ public class Cleaner extends CompactorThread {
        * Between that check and removeFiles() a query starts (it will be 
reading D3) and another compaction
        * completes which creates D4.
        * Now removeFiles() (more specifically AcidUtils.getAcidState()) will 
declare D3 to be obsolete
-       * unless ValidTxnList is "capped" at highestTxnId.
+       * unless ValidTxnList is "capped" at highestWriteId.
        */
-      final ValidTxnList txnList = ci.highestTxnId > 0 ? 
-        new ValidReadTxnList(new long[0], new BitSet(), ci.highestTxnId) : new 
ValidReadTxnList();
+      final ValidWriteIdList txnList = (ci.highestWriteId > 0)
+              ? new ValidReaderWriteIdList(ci.getFullTableName(), new long[0], 
new BitSet(), ci.highestWriteId)
+              : new ValidReaderWriteIdList();
 
       if (runJobAsSelf(ci.runAs)) {
         removeFiles(location, txnList);
@@ -288,8 +289,8 @@ public class Cleaner extends CompactorThread {
     }
   }
 
-  private void removeFiles(String location, ValidTxnList txnList) throws 
IOException {
-    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, 
txnList);
+  private void removeFiles(String location, ValidWriteIdList writeIdList) 
throws IOException {
+    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, 
writeIdList);
     List<FileStatus> obsoleteDirs = dir.getObsolete();
     List<Path> filesToDelete = new ArrayList<Path>(obsoleteDirs.size());
     for (FileStatus stat : obsoleteDirs) {

Reply via email to