http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 334cb31..6261a14 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -80,7 +80,10 @@ import 
org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils.AcidBaseFileInfo;
+import org.apache.hadoop.hive.ql.io.AcidUtils.AcidOperationalProperties;
 import org.apache.hadoop.hive.ql.io.AcidUtils.Directory;
+import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta;
 import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat;
 import org.apache.hadoop.hive.ql.io.BatchToRowReader;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
@@ -525,6 +528,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     private final ValidTxnList transactionList;
     private SplitStrategyKind splitStrategyKind;
     private final SearchArgument sarg;
+    private final AcidOperationalProperties acidOperationalProperties;
 
     Context(Configuration conf) throws IOException {
       this(conf, 1, null);
@@ -606,6 +610,15 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
       }
       String value = conf.get(ValidTxnList.VALID_TXNS_KEY);
       transactionList = value == null ? new ValidReadTxnList() : new 
ValidReadTxnList(value);
+
+      // Determine the transactional_properties of the table from the job conf 
stored in context.
+      // The table properties are copied to job conf at 
HiveInputFormat::addSplitsForGroup(),
+      // & therefore we should be able to retrieve them here and determine 
appropriate behavior.
+      // Note that this will be meaningless for non-acid tables & will be set 
to null.
+      boolean isTableTransactional = 
conf.getBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, false);
+      String transactionalProperties = 
conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
+      this.acidOperationalProperties = isTableTransactional ?
+          AcidOperationalProperties.parseString(transactionalProperties) : 
null;
     }
 
     @VisibleForTesting
@@ -639,17 +652,20 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
   @VisibleForTesting
   static final class AcidDirInfo {
     public AcidDirInfo(FileSystem fs, Path splitPath, Directory acidInfo,
-        List<HdfsFileStatusWithId> baseOrOriginalFiles) {
+        List<AcidBaseFileInfo> baseFiles,
+        List<ParsedDelta> parsedDeltas) {
       this.splitPath = splitPath;
       this.acidInfo = acidInfo;
-      this.baseOrOriginalFiles = baseOrOriginalFiles;
+      this.baseFiles = baseFiles;
       this.fs = fs;
+      this.parsedDeltas = parsedDeltas;
     }
 
     final FileSystem fs;
     final Path splitPath;
     final AcidUtils.Directory acidInfo;
-    final List<HdfsFileStatusWithId> baseOrOriginalFiles;
+    final List<AcidBaseFileInfo> baseFiles;
+    final List<ParsedDelta> parsedDeltas;
   }
 
   @VisibleForTesting
@@ -672,7 +688,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     SplitInfo(Context context, FileSystem fs, HdfsFileStatusWithId fileWithId, 
OrcTail orcTail,
         List<OrcProto.Type> readerTypes, boolean isOriginal, 
List<DeltaMetaData> deltas,
         boolean hasBase, Path dir, boolean[] covered, ByteBuffer ppdResult) 
throws IOException {
-      super(dir, context.numBuckets, deltas, covered);
+      super(dir, context.numBuckets, deltas, covered, 
context.acidOperationalProperties);
       this.context = context;
       this.fs = fs;
       this.fileWithId = fileWithId;
@@ -916,7 +932,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     public BISplitStrategy(Context context, FileSystem fs,
         Path dir, List<HdfsFileStatusWithId> fileStatuses, boolean isOriginal,
         List<DeltaMetaData> deltas, boolean[] covered, boolean 
allowSyntheticFileIds) {
-      super(dir, context.numBuckets, deltas, covered);
+      super(dir, context.numBuckets, deltas, covered, 
context.acidOperationalProperties);
       this.fileStatuses = fileStatuses;
       this.isOriginal = isOriginal;
       this.deltas = deltas;
@@ -964,20 +980,33 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     List<DeltaMetaData> deltas;
     boolean[] covered;
     int numBuckets;
+    AcidOperationalProperties acidOperationalProperties;
 
-    public ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> 
deltas, boolean[] covered) {
+    public ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> 
deltas, boolean[] covered,
+        AcidOperationalProperties acidOperationalProperties) {
       this.dir = dir;
       this.numBuckets = numBuckets;
       this.deltas = deltas;
       this.covered = covered;
+      this.acidOperationalProperties = acidOperationalProperties;
     }
 
     @Override
     public List<OrcSplit> getSplits() throws IOException {
+      List<OrcSplit> splits = Lists.newArrayList();
+
+      // When split-update is enabled, we do not need to account for buckets 
that aren't covered.
+      // This is a huge performance benefit of split-update. And the reason 
why we are able to
+      // do so is because the 'deltas' here are actually only the 
delete_deltas. All the insert_deltas
+      // with valid user payload data has already been considered as base for 
the covered buckets.
+      // Hence, the uncovered buckets do not have any relevant data and we can 
just ignore them.
+      if (acidOperationalProperties != null && 
acidOperationalProperties.isSplitUpdate()) {
+        return splits; // return an empty list.
+      }
+
       // Generate a split for any buckets that weren't covered.
       // This happens in the case where a bucket just has deltas and no
       // base.
-      List<OrcSplit> splits = Lists.newArrayList();
       if (!deltas.isEmpty()) {
         for (int b = 0; b < numBuckets; ++b) {
           if (!covered[b]) {
@@ -1032,13 +1061,70 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     }
 
     private AcidDirInfo callInternal() throws IOException {
-      AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
-          context.conf, context.transactionList, useFileIds, true);
+      AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf,
+          context.transactionList, useFileIds, true);
       Path base = dirInfo.getBaseDirectory();
       // find the base files (original or new style)
-      List<HdfsFileStatusWithId> children = (base == null)
-          ? dirInfo.getOriginalFiles() : findBaseFiles(base, useFileIds);
-      return new AcidDirInfo(fs, dir, dirInfo, children);
+      List<AcidBaseFileInfo> baseFiles = new ArrayList<AcidBaseFileInfo>();
+      if (base == null) {
+        for (HdfsFileStatusWithId fileId : dirInfo.getOriginalFiles()) {
+          baseFiles.add(new AcidBaseFileInfo(fileId, 
AcidUtils.AcidBaseFileType.ORIGINAL_BASE));
+        }
+      } else {
+        List<HdfsFileStatusWithId> compactedBaseFiles = findBaseFiles(base, 
useFileIds);
+        for (HdfsFileStatusWithId fileId : compactedBaseFiles) {
+          baseFiles.add(new AcidBaseFileInfo(fileId, 
AcidUtils.AcidBaseFileType.COMPACTED_BASE));
+        }
+      }
+
+      // Find the parsed deltas- some of them containing only the insert delta 
events
+      // may get treated as base if split-update is enabled for ACID. (See 
HIVE-14035 for details)
+      List<ParsedDelta> parsedDeltas = new ArrayList<ParsedDelta>();
+
+      if (context.acidOperationalProperties != null &&
+          context.acidOperationalProperties.isSplitUpdate()) {
+        // If we have split-update turned on for this table, then the delta 
events have already been
+        // split into two directories- delta_x_y/ and delete_delta_x_y/.
+        // When you have split-update turned on, the insert events go to 
delta_x_y/ directory and all
+        // the delete events go to delete_x_y/. An update event will generate 
two events-
+        // a delete event for the old record that is put into 
delete_delta_x_y/,
+        // followed by an insert event for the updated record put into the 
usual delta_x_y/.
+        // Therefore, everything inside delta_x_y/ is an insert event and all 
the files in delta_x_y/
+        // can be treated like base files. Hence, each of these are added to 
baseOrOriginalFiles list.
+
+        for (ParsedDelta parsedDelta : dirInfo.getCurrentDirectories()) {
+          if (parsedDelta.isDeleteDelta()) {
+            parsedDeltas.add(parsedDelta);
+          } else {
+            // This is a normal insert delta, which only has insert events and 
hence all the files
+            // in this delta directory can be considered as a base.
+            if (useFileIds) {
+              try {
+                List<HdfsFileStatusWithId> insertDeltaFiles =
+                    SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), 
AcidUtils.hiddenFileFilter);
+                for (HdfsFileStatusWithId fileId : insertDeltaFiles) {
+                  baseFiles.add(new AcidBaseFileInfo(fileId, 
AcidUtils.AcidBaseFileType.INSERT_DELTA));
+                }
+                continue; // move on to process to the next parsedDelta.
+              } catch (Throwable t) {
+                LOG.error("Failed to get files with ID; using regular API: " + 
t.getMessage());
+              }
+            }
+            // Fall back to regular API and create statuses without ID.
+            List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, 
parsedDelta.getPath(), AcidUtils.hiddenFileFilter);
+            for (FileStatus child : children) {
+              HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, 
child);
+              baseFiles.add(new AcidBaseFileInfo(fileId, 
AcidUtils.AcidBaseFileType.INSERT_DELTA));
+            }
+          }
+        }
+
+      } else {
+        // When split-update is not enabled, then all the deltas in the 
current directories
+        // should be considered as usual.
+        parsedDeltas.addAll(dirInfo.getCurrentDirectories());
+      }
+      return new AcidDirInfo(fs, dir, dirInfo, baseFiles, parsedDeltas);
     }
 
     private List<HdfsFileStatusWithId> findBaseFiles(
@@ -1526,26 +1612,32 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
           continue;
         }
 
-        // We have received a new directory information, make a split strategy.
+        // We have received a new directory information, make split strategies.
         --resultsLeft;
-        SplitStrategy<?> splitStrategy = determineSplitStrategy(combinedCtx, 
context, adi.fs,
-            adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, readerTypes, 
ugi,
+
+        // The reason why we can get a list of split strategies here is 
because for ACID split-update
+        // case when we have a mix of original base files & insert deltas, we 
will produce two
+        // independent split strategies for them. There is a global flag 
'isOriginal' that is set
+        // on a per split strategy basis and it has to be same for all the 
files in that strategy.
+        List<SplitStrategy<?>> splitStrategies = 
determineSplitStrategies(combinedCtx, context, adi.fs,
+            adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas, 
readerTypes, ugi,
             allowSyntheticFileIds);
-        if (splitStrategy == null) continue; // Combined.
 
-        if (isDebugEnabled) {
-          LOG.debug("Split strategy: {}", splitStrategy);
-        }
+        for (SplitStrategy<?> splitStrategy : splitStrategies) {
+          if (isDebugEnabled) {
+            LOG.debug("Split strategy: {}", splitStrategy);
+          }
 
-        // Hack note - different split strategies return differently typed 
lists, yay Java.
-        // This works purely by magic, because we know which strategy produces 
which type.
-        if (splitStrategy instanceof ETLSplitStrategy) {
-          scheduleSplits((ETLSplitStrategy)splitStrategy,
-              context, splitFutures, strategyFutures, splits);
-        } else {
-          @SuppressWarnings("unchecked")
-          List<OrcSplit> readySplits = 
(List<OrcSplit>)splitStrategy.getSplits();
-          splits.addAll(readySplits);
+          // Hack note - different split strategies return differently typed 
lists, yay Java.
+          // This works purely by magic, because we know which strategy 
produces which type.
+          if (splitStrategy instanceof ETLSplitStrategy) {
+            scheduleSplits((ETLSplitStrategy)splitStrategy,
+                context, splitFutures, strategyFutures, splits);
+          } else {
+            @SuppressWarnings("unchecked")
+            List<OrcSplit> readySplits = 
(List<OrcSplit>)splitStrategy.getSplits();
+            splits.addAll(readySplits);
+          }
         }
       }
 
@@ -1763,6 +1855,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
 
     final OrcSplit split = (OrcSplit) inputSplit;
     final Path path = split.getPath();
+
     Path root;
     if (split.hasBase()) {
       if (split.isOriginal()) {
@@ -1773,7 +1866,20 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     } else {
       root = path;
     }
-    final Path[] deltas = AcidUtils.deserializeDeltas(root, split.getDeltas());
+
+    // Retrieve the acidOperationalProperties for the table, initialized in 
HiveInputFormat.
+    AcidUtils.AcidOperationalProperties acidOperationalProperties
+            = 
AcidUtils.getAcidOperationalProperties(options.getConfiguration());
+
+    // The deltas are decided based on whether split-update has been turned on 
for the table or not.
+    // When split-update is turned off, everything in the delta_x_y/ directory 
should be treated
+    // as delta. However if split-update is turned on, only the files in 
delete_delta_x_y/ directory
+    // need to be considered as delta, because files in delta_x_y/ will be 
processed as base files
+    // since they only have insert events in them.
+    final Path[] deltas =
+        acidOperationalProperties.isSplitUpdate() ?
+            AcidUtils.deserializeDeleteDeltas(root, split.getDeltas())
+            : AcidUtils.deserializeDeltas(root, split.getDeltas());
     final Configuration conf = options.getConfiguration();
 
 
@@ -1793,7 +1899,7 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
     setSearchArgument(readOptions, schemaTypes, conf, 
SCHEMA_TYPES_IS_ORIGINAL);
 
     if (split.hasBase()) {
-      bucket = AcidUtils.parseBaseBucketFilename(split.getPath(), conf)
+      bucket = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf)
           .getBucket();
       OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf)
           .maxLength(split.getFileLength());
@@ -1948,23 +2054,76 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
   }
 
   @VisibleForTesting
+  static List<SplitStrategy<?>> determineSplitStrategies(CombinedCtx 
combinedCtx, Context context,
+      FileSystem fs, Path dir, AcidUtils.Directory dirInfo,
+      List<AcidBaseFileInfo> baseFiles,
+      List<ParsedDelta> parsedDeltas,
+      List<OrcProto.Type> readerTypes,
+      UserGroupInformation ugi, boolean allowSyntheticFileIds) {
+    List<SplitStrategy<?>> splitStrategies = new ArrayList<SplitStrategy<?>>();
+    SplitStrategy<?> splitStrategy;
+
+    // When no baseFiles, we will just generate a single split strategy and 
return.
+    List<HdfsFileStatusWithId> acidSchemaFiles = new 
ArrayList<HdfsFileStatusWithId>();
+    if (baseFiles.isEmpty()) {
+      splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, 
dirInfo,
+          acidSchemaFiles, false, parsedDeltas, readerTypes, ugi, 
allowSyntheticFileIds);
+      if (splitStrategy != null) {
+        splitStrategies.add(splitStrategy);
+      }
+      return splitStrategies; // return here
+    }
+
+    List<HdfsFileStatusWithId> originalSchemaFiles = new 
ArrayList<HdfsFileStatusWithId>();
+    // Separate the base files into acid schema and non-acid(original) schema 
files.
+    for (AcidBaseFileInfo acidBaseFileInfo : baseFiles) {
+      if (acidBaseFileInfo.isOriginal()) {
+        originalSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId());
+      } else {
+        acidSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId());
+      }
+    }
+
+    // Generate split strategy for non-acid schema original files, if any.
+    if (!originalSchemaFiles.isEmpty()) {
+      splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, 
dirInfo,
+          originalSchemaFiles, true, parsedDeltas, readerTypes, ugi, 
allowSyntheticFileIds);
+      if (splitStrategy != null) {
+        splitStrategies.add(splitStrategy);
+      }
+    }
+
+    // Generate split strategy for acid schema files, if any.
+    if (!acidSchemaFiles.isEmpty()) {
+      splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, 
dirInfo,
+          acidSchemaFiles, false, parsedDeltas, readerTypes, ugi, 
allowSyntheticFileIds);
+      if (splitStrategy != null) {
+        splitStrategies.add(splitStrategy);
+      }
+    }
+
+    return splitStrategies;
+  }
+
+  @VisibleForTesting
   static SplitStrategy<?> determineSplitStrategy(CombinedCtx combinedCtx, 
Context context,
       FileSystem fs, Path dir, AcidUtils.Directory dirInfo,
-      List<HdfsFileStatusWithId> baseOrOriginalFiles, List<OrcProto.Type> 
readerTypes,
+      List<HdfsFileStatusWithId> baseFiles,
+      boolean isOriginal,
+      List<ParsedDelta> parsedDeltas,
+      List<OrcProto.Type> readerTypes,
       UserGroupInformation ugi, boolean allowSyntheticFileIds) {
-    Path base = dirInfo.getBaseDirectory();
-    List<HdfsFileStatusWithId> original = dirInfo.getOriginalFiles();
-    List<DeltaMetaData> deltas = 
AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
+    List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(parsedDeltas);
     boolean[] covered = new boolean[context.numBuckets];
-    boolean isOriginal = base == null;
 
     // if we have a base to work from
-    if (base != null || !original.isEmpty()) {
+    if (!baseFiles.isEmpty()) {
       long totalFileSize = 0;
-      for (HdfsFileStatusWithId child : baseOrOriginalFiles) {
+      for (HdfsFileStatusWithId child : baseFiles) {
         totalFileSize += child.getFileStatus().getLen();
-        AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename
+        AcidOutputFormat.Options opts = 
AcidUtils.parseBaseOrDeltaBucketFilename
             (child.getFileStatus().getPath(), context.conf);
+        opts.writingBase(true);
         int b = opts.getBucket();
         // If the bucket is in the valid range, mark it as covered.
         // I wish Hive actually enforced bucketing all of the time.
@@ -1973,31 +2132,32 @@ public class OrcInputFormat implements 
InputFormat<NullWritable, OrcStruct>,
         }
       }
 
-      int numFiles = baseOrOriginalFiles.size();
+      int numFiles = baseFiles.size();
       long avgFileSize = totalFileSize / numFiles;
       int totalFiles = context.numFilesCounter.addAndGet(numFiles);
       switch(context.splitStrategyKind) {
         case BI:
           // BI strategy requested through config
-          return new BISplitStrategy(context, fs, dir, baseOrOriginalFiles,
+          return new BISplitStrategy(context, fs, dir, baseFiles,
               isOriginal, deltas, covered, allowSyntheticFileIds);
         case ETL:
           // ETL strategy requested through config
-          return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, 
baseOrOriginalFiles,
+          return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, 
baseFiles,
               deltas, covered, readerTypes, isOriginal, ugi, 
allowSyntheticFileIds);
         default:
           // HYBRID strategy
           if (avgFileSize > context.maxSize || totalFiles <= 
context.etlFileThreshold) {
-            return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, 
baseOrOriginalFiles,
+            return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, 
baseFiles,
                 deltas, covered, readerTypes, isOriginal, ugi, 
allowSyntheticFileIds);
           } else {
-            return new BISplitStrategy(context, fs, dir, baseOrOriginalFiles,
+            return new BISplitStrategy(context, fs, dir, baseFiles,
                 isOriginal, deltas, covered, allowSyntheticFileIds);
           }
       }
     } else {
       // no base, only deltas
-      return new ACIDSplitStrategy(dir, context.numBuckets, deltas, covered);
+      return new ACIDSplitStrategy(dir, context.numBuckets, deltas, covered,
+          context.acidOperationalProperties);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 1a1af28..492c64c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -25,11 +25,6 @@ import java.nio.charset.CharsetDecoder;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.orc.impl.AcidStats;
-import org.apache.orc.impl.OrcAcidUtils;
-import org.apache.orc.OrcConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -42,10 +37,16 @@ import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.impl.AcidStats;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -79,9 +80,13 @@ public class OrcRecordUpdater implements RecordUpdater {
   private static final Charset UTF8 = Charset.forName("UTF-8");
 
   private final AcidOutputFormat.Options options;
+  private final AcidUtils.AcidOperationalProperties acidOperationalProperties;
   private final Path path;
+  private Path deleteEventPath;
   private final FileSystem fs;
+  private OrcFile.WriterOptions writerOptions;
   private Writer writer;
+  private Writer deleteEventWriter = null;
   private final FSDataOutputStream flushLengths;
   private final OrcStruct item;
   private final IntWritable operation = new IntWritable();
@@ -95,9 +100,11 @@ public class OrcRecordUpdater implements RecordUpdater {
   // because that is monotonically increasing to give new unique row ids.
   private long rowCountDelta = 0;
   private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder();
+  private KeyIndexBuilder deleteEventIndexBuilder;
   private StructField recIdField = null; // field to look for the record 
identifier in
   private StructField rowIdField = null; // field inside recId to look for row 
id in
   private StructField originalTxnField = null;  // field inside recId to look 
for original txn in
+  private StructField bucketField = null; // field inside recId to look for 
bucket in
   private StructObjectInspector rowInspector; // OI for the original row
   private StructObjectInspector recIdInspector; // OI for the record 
identifier struct
   private LongObjectInspector rowIdInspector; // OI for the long row id inside 
the recordIdentifier
@@ -180,8 +187,22 @@ public class OrcRecordUpdater implements RecordUpdater {
   OrcRecordUpdater(Path path,
                    AcidOutputFormat.Options options) throws IOException {
     this.options = options;
+    // Initialize acidOperationalProperties based on table properties, and
+    // if they are not available, see if we can find it in the job 
configuration.
+    // We have to look at these two places instead of just the conf, because 
Streaming Ingest
+    // uses table properties, while normal Hive SQL inserts/updates/deletes 
will place this
+    // value in the configuration object.
+    if (options.getTableProperties() != null) {
+      this.acidOperationalProperties =
+          AcidUtils.getAcidOperationalProperties(options.getTableProperties());
+    } else {
+      this.acidOperationalProperties =
+          AcidUtils.getAcidOperationalProperties(options.getConfiguration());
+    }
     this.bucket.set(options.getBucket());
     this.path = AcidUtils.createFilename(path, options);
+    this.deleteEventWriter = null;
+    this.deleteEventPath = null;
     FileSystem fs = options.getFilesystem();
     if (fs == null) {
       fs = path.getFileSystem(options.getConfiguration());
@@ -205,7 +226,7 @@ public class OrcRecordUpdater implements RecordUpdater {
     } else {
       flushLengths = null;
     }
-    OrcFile.WriterOptions writerOptions = null;
+    this.writerOptions = null;
     // If writing delta dirs, we need to make a clone of original options, to 
avoid polluting it for
     // the base writer
     if (options.isWritingBase()) {
@@ -242,6 +263,13 @@ public class OrcRecordUpdater implements RecordUpdater {
     writerOptions.inspector(createEventSchema(findRecId(options.getInspector(),
         options.getRecordIdColumn())));
     this.writer = OrcFile.createWriter(this.path, writerOptions);
+    if (this.acidOperationalProperties.isSplitUpdate()) {
+      // If this is a split-update, we initialize a delete delta file path in 
anticipation that
+      // they would write update/delete events to that separate file.
+      // This writes to a file in directory which starts with 
"delete_delta_..."
+      // The actual initialization of a writer only happens if any delete 
events are written.
+      this.deleteEventPath = AcidUtils.createFilename(path, 
options.writingDeleteDelta(true));
+    }
     item = new OrcStruct(FIELDS);
     item.setFieldValue(OPERATION, operation);
     item.setFieldValue(CURRENT_TRANSACTION, currentTransaction);
@@ -250,6 +278,7 @@ public class OrcRecordUpdater implements RecordUpdater {
     item.setFieldValue(ROW_ID, rowId);
   }
 
+  @Override
   public String toString() {
     return getClass().getName() + "[" + path +"]";
   }
@@ -264,14 +293,16 @@ public class OrcRecordUpdater implements RecordUpdater {
     * 1. need to know bucket we are writing to
     * 2. need to know which delta dir it's in
     * Then,
-    * 1. find the same bucket file in previous delta dir for this txn
+    * 1. find the same bucket file in previous (insert) delta dir for this txn
+    *    (Note: in case of split_update, we can ignore the delete_delta dirs)
     * 2. read the footer and get AcidStats which has insert count
-     * 2.1 if AcidStats.inserts>0 done
+     * 2.1 if AcidStats.inserts>0 add to the insert count.
      *  else go to previous delta file
      *  For example, consider insert/update/insert case...*/
     if(options.getStatementId() <= 0) {
       return 0;//there is only 1 statement in this transaction (so far)
     }
+    long totalInserts = 0;
     for(int pastStmt = options.getStatementId() - 1; pastStmt >= 0; 
pastStmt--) {
       Path matchingBucket = 
AcidUtils.createFilename(options.getFinalDestination(), 
options.clone().statementId(pastStmt));
       if(!fs.exists(matchingBucket)) {
@@ -281,12 +312,10 @@ public class OrcRecordUpdater implements RecordUpdater {
       //no close() on Reader?!
       AcidStats acidStats = OrcAcidUtils.parseAcidStats(reader);
       if(acidStats.inserts > 0) {
-        return acidStats.inserts;
+        totalInserts += acidStats.inserts;
       }
     }
-    //if we got here, we looked at all delta files in this txn, prior to 
current statement and didn't 
-    //find any inserts...
-    return 0;
+    return totalInserts;
   }
   // Find the record identifier column (if there) and return a possibly new 
ObjectInspector that
   // will strain out the record id for the underlying writer.
@@ -307,6 +336,7 @@ public class OrcRecordUpdater implements RecordUpdater {
       // in RecordIdentifier is transactionId, bucketId, rowId
       originalTxnField = fields.get(0);
       origTxnInspector = 
(LongObjectInspector)originalTxnField.getFieldObjectInspector();
+      bucketField = fields.get(1);
       rowIdField = fields.get(2);
       rowIdInspector = 
(LongObjectInspector)rowIdField.getFieldObjectInspector();
 
@@ -316,7 +346,7 @@ public class OrcRecordUpdater implements RecordUpdater {
     }
   }
 
-  private void addEvent(int operation, long currentTransaction, long rowId, 
Object row)
+  private void addSimpleEvent(int operation, long currentTransaction, long 
rowId, Object row)
       throws IOException {
     this.operation.set(operation);
     this.currentTransaction.set(currentTransaction);
@@ -334,11 +364,60 @@ public class OrcRecordUpdater implements RecordUpdater {
     }
     this.rowId.set(rowId);
     this.originalTransaction.set(originalTransaction);
+    item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(operation));
     item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? 
null : row));
     indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId);
     writer.addRow(item);
   }
 
+  private void addSplitUpdateEvent(int operation, long currentTransaction, 
long rowId, Object row)
+      throws IOException {
+    if (operation == INSERT_OPERATION) {
+      // Just insert the record in the usual way, i.e., default to the simple 
behavior.
+      addSimpleEvent(operation, currentTransaction, rowId, row);
+      return;
+    }
+    this.operation.set(operation);
+    this.currentTransaction.set(currentTransaction);
+    Object rowValue = rowInspector.getStructFieldData(row, recIdField);
+    long originalTransaction = origTxnInspector.get(
+            recIdInspector.getStructFieldData(rowValue, originalTxnField));
+    rowId = rowIdInspector.get(
+            recIdInspector.getStructFieldData(rowValue, rowIdField));
+
+    if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) {
+      // Initialize a deleteEventWriter if not yet done. (Lazy initialization)
+      if (deleteEventWriter == null) {
+        // Initialize an indexBuilder for deleteEvents.
+        deleteEventIndexBuilder = new KeyIndexBuilder();
+        // Change the indexBuilder callback too for the deleteEvent file, the 
remaining writer
+        // options remain the same.
+
+        // TODO: When we change the callback, we are essentially mutating the 
writerOptions.
+        // This works but perhaps is not a good thing. The proper way to do 
this would be
+        // to clone the writerOptions, however it requires that the parent 
OrcFile.writerOptions
+        // implements a clone() method (which it does not for now). HIVE-14514 
is currently an open
+        // JIRA to fix this.
+
+        this.deleteEventWriter = OrcFile.createWriter(deleteEventPath,
+                                                      
writerOptions.callback(deleteEventIndexBuilder));
+      }
+
+      // A delete/update generates a delete event for the original row.
+      this.rowId.set(rowId);
+      this.originalTransaction.set(originalTransaction);
+      item.setFieldValue(OrcRecordUpdater.OPERATION, new 
IntWritable(DELETE_OPERATION));
+      item.setFieldValue(OrcRecordUpdater.ROW, null); // ROW is null for 
delete events.
+      deleteEventIndexBuilder.addKey(DELETE_OPERATION, originalTransaction, 
bucket.get(), rowId);
+      deleteEventWriter.addRow(item);
+    }
+
+    if (operation == UPDATE_OPERATION) {
+      // A new row is also inserted in the usual delta file for an update 
event.
+      addSimpleEvent(INSERT_OPERATION, currentTransaction, insertedRows++, 
row);
+    }
+  }
+
   @Override
   public void insert(long currentTransaction, Object row) throws IOException {
     if (this.currentTransaction.get() != currentTransaction) {
@@ -347,7 +426,11 @@ public class OrcRecordUpdater implements RecordUpdater {
       //always true in that case
       rowIdOffset = findRowIdOffsetForInsert();
     }
-    addEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row);
+    if (acidOperationalProperties.isSplitUpdate()) {
+      addSplitUpdateEvent(INSERT_OPERATION, currentTransaction, 
insertedRows++, row);
+    } else {
+      addSimpleEvent(INSERT_OPERATION, currentTransaction, insertedRows++, 
row);
+    }
     rowCountDelta++;
   }
 
@@ -355,8 +438,13 @@ public class OrcRecordUpdater implements RecordUpdater {
   public void update(long currentTransaction, Object row) throws IOException {
     if (this.currentTransaction.get() != currentTransaction) {
       insertedRows = 0;
+      rowIdOffset = findRowIdOffsetForInsert();
+    }
+    if (acidOperationalProperties.isSplitUpdate()) {
+      addSplitUpdateEvent(UPDATE_OPERATION, currentTransaction, -1L, row);
+    } else {
+      addSimpleEvent(UPDATE_OPERATION, currentTransaction, -1L, row);
     }
-    addEvent(UPDATE_OPERATION, currentTransaction, -1L, row);
   }
 
   @Override
@@ -364,9 +452,12 @@ public class OrcRecordUpdater implements RecordUpdater {
     if (this.currentTransaction.get() != currentTransaction) {
       insertedRows = 0;
     }
-    addEvent(DELETE_OPERATION, currentTransaction, -1, row);
+    if (acidOperationalProperties.isSplitUpdate()) {
+      addSplitUpdateEvent(DELETE_OPERATION, currentTransaction, -1L, row);
+    } else {
+      addSimpleEvent(DELETE_OPERATION, currentTransaction, -1L, row);
+    }
     rowCountDelta--;
-
   }
 
   @Override
@@ -390,13 +481,38 @@ public class OrcRecordUpdater implements RecordUpdater {
         fs.delete(path, false);
       }
     } else {
-      if (writer != null) writer.close();
+      if (writer != null) {
+        if (acidOperationalProperties.isSplitUpdate()) {
+          // When split-update is enabled, we can choose not to write
+          // any delta files when there are no inserts. In such cases only the 
delete_deltas
+          // would be written & they are closed separately below.
+          if (indexBuilder.acidStats.inserts > 0) {
+            writer.close(); // normal close, when there are inserts.
+          } else {
+            // Just remove insert delta paths, when there are no insert events.
+            fs.delete(path, false);
+          }
+        } else {
+          writer.close(); // normal close.
+        }
+      }
+      if (deleteEventWriter != null) {
+        if (deleteEventIndexBuilder.acidStats.deletes > 0) {
+          // Only need to write out & close the delete_delta if there have 
been any.
+          deleteEventWriter.close();
+        } else {
+          // Just remove delete_delta, if there have been no delete events.
+          fs.delete(deleteEventPath, false);
+        }
+      }
+
     }
     if (flushLengths != null) {
       flushLengths.close();
       fs.delete(OrcAcidUtils.getSideFile(path), false);
     }
     writer = null;
+    deleteEventWriter = null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 8cb5e8a..5f53aef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.common.ObjectPair;
@@ -3167,10 +3168,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
   private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path 
dst,
                                     List<Path> newFiles) throws HiveException {
-    // The layout for ACID files is table|partname/base|delta/bucket
+    // The layout for ACID files is 
table|partname/base|delta|delete_delta/bucket
     // We will always only be writing delta files.  In the buckets created by 
FileSinkOperator
-    // it will look like bucket/delta/bucket.  So we need to move that into 
the above structure.
-    // For the first mover there will be no delta directory, so we can move 
the whole directory.
+    // it will look like bucket/delta|delete_delta/bucket.  So we need to move 
that into
+    // the above structure. For the first mover there will be no delta 
directory,
+    // so we can move the whole directory.
     // For everyone else we will need to just move the buckets under the 
existing delta
     // directory.
 
@@ -3193,49 +3195,58 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
       for (FileStatus origBucketStat : origBucketStats) {
         Path origBucketPath = origBucketStat.getPath();
-        LOG.debug("Acid move looking for delta files in bucket " + 
origBucketPath);
+        moveAcidDeltaFiles(AcidUtils.DELTA_PREFIX, AcidUtils.deltaFileFilter,
+                fs, dst, origBucketPath, createdDeltaDirs, newFiles);
+        moveAcidDeltaFiles(AcidUtils.DELETE_DELTA_PREFIX, 
AcidUtils.deleteEventDeltaDirFilter,
+                fs, dst,origBucketPath, createdDeltaDirs, newFiles);
+      }
+    }
+  }
 
-        FileStatus[] deltaStats = null;
-        try {
-          deltaStats = fs.listStatus(origBucketPath, 
AcidUtils.deltaFileFilter);
-        } catch (IOException e) {
-          throw new HiveException("Unable to look for delta files in original 
bucket " +
-              origBucketPath.toUri().toString(), e);
-        }
-        LOG.debug("Acid move found " + deltaStats.length + " delta files");
-
-        for (FileStatus deltaStat : deltaStats) {
-          Path deltaPath = deltaStat.getPath();
-          // Create the delta directory.  Don't worry if it already exists,
-          // as that likely means another task got to it first.  Then move 
each of the buckets.
-          // it would be more efficient to try to move the delta with it's 
buckets but that is
-          // harder to make race condition proof.
-          Path deltaDest = new Path(dst, deltaPath.getName());
+  private static void moveAcidDeltaFiles(String deltaFileType, PathFilter 
pathFilter, FileSystem fs,
+                                         Path dst, Path origBucketPath, 
Set<Path> createdDeltaDirs,
+                                         List<Path> newFiles) throws 
HiveException {
+    LOG.debug("Acid move looking for " + deltaFileType + " files in bucket " + 
origBucketPath);
+
+    FileStatus[] deltaStats = null;
+    try {
+      deltaStats = fs.listStatus(origBucketPath, pathFilter);
+    } catch (IOException e) {
+      throw new HiveException("Unable to look for " + deltaFileType + " files 
in original bucket " +
+          origBucketPath.toUri().toString(), e);
+    }
+    LOG.debug("Acid move found " + deltaStats.length + " " + deltaFileType + " 
files");
+
+    for (FileStatus deltaStat : deltaStats) {
+      Path deltaPath = deltaStat.getPath();
+      // Create the delta directory.  Don't worry if it already exists,
+      // as that likely means another task got to it first.  Then move each of 
the buckets.
+      // it would be more efficient to try to move the delta with it's buckets 
but that is
+      // harder to make race condition proof.
+      Path deltaDest = new Path(dst, deltaPath.getName());
+      try {
+        if (!createdDeltaDirs.contains(deltaDest)) {
           try {
-            if (!createdDeltaDirs.contains(deltaDest)) {
-              try {
-                fs.mkdirs(deltaDest);
-                createdDeltaDirs.add(deltaDest);
-              } catch (IOException swallowIt) {
-                // Don't worry about this, as it likely just means it's 
already been created.
-                LOG.info("Unable to create delta directory " + deltaDest +
-                    ", assuming it already exists: " + swallowIt.getMessage());
-              }
-            }
-            FileStatus[] bucketStats = fs.listStatus(deltaPath, 
AcidUtils.bucketFileFilter);
-            LOG.debug("Acid move found " + bucketStats.length + " bucket 
files");
-            for (FileStatus bucketStat : bucketStats) {
-              Path bucketSrc = bucketStat.getPath();
-              Path bucketDest = new Path(deltaDest, bucketSrc.getName());
-              LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to 
" +
-                  bucketDest.toUri().toString());
-              fs.rename(bucketSrc, bucketDest);
-              if (newFiles != null) newFiles.add(bucketDest);
-            }
-          } catch (IOException e) {
-            throw new HiveException("Error moving acid files " + 
e.getMessage(), e);
+            fs.mkdirs(deltaDest);
+            createdDeltaDirs.add(deltaDest);
+          } catch (IOException swallowIt) {
+            // Don't worry about this, as it likely just means it's already 
been created.
+            LOG.info("Unable to create " + deltaFileType + " directory " + 
deltaDest +
+                ", assuming it already exists: " + swallowIt.getMessage());
           }
         }
+        FileStatus[] bucketStats = fs.listStatus(deltaPath, 
AcidUtils.bucketFileFilter);
+        LOG.debug("Acid move found " + bucketStats.length + " bucket files");
+        for (FileStatus bucketStat : bucketStats) {
+          Path bucketSrc = bucketStat.getPath();
+          Path bucketDest = new Path(deltaDest, bucketSrc.getName());
+          LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " +
+              bucketDest.toUri().toString());
+          fs.rename(bucketSrc, bucketDest);
+          if (newFiles != null) newFiles.add(bucketDest);
+        }
+      } catch (IOException e) {
+        throw new HiveException("Error moving acid files " + e.getMessage(), 
e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index 8cf261d..47c65bb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -101,6 +101,8 @@ public class TableScanDesc extends AbstractOperatorDesc {
 
   private boolean isAcidTable;
 
+  private AcidUtils.AcidOperationalProperties acidOperationalProperties = null;
+
   private transient TableSample tableSample;
 
   private transient Table tableMetadata;
@@ -127,6 +129,9 @@ public class TableScanDesc extends AbstractOperatorDesc {
     this.virtualCols = vcs;
     this.tableMetadata = tblMetadata;
     isAcidTable = AcidUtils.isAcidTable(this.tableMetadata);
+    if (isAcidTable) {
+      acidOperationalProperties = 
AcidUtils.getAcidOperationalProperties(this.tableMetadata);
+    }
   }
 
   @Override
@@ -159,6 +164,10 @@ public class TableScanDesc extends AbstractOperatorDesc {
     return isAcidTable;
   }
 
+  public AcidUtils.AcidOperationalProperties getAcidOperationalProperties() {
+    return acidOperationalProperties;
+  }
+
   @Explain(displayName = "Output", explainLevels = { Level.USER })
   public List<String> getOutputColumnNames() {
     return this.neededColumns;

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 6caca98..c3e3982 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -17,9 +17,16 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
-import org.apache.hadoop.hive.common.ValidCompactorTxnList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.regex.Matcher;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
@@ -27,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.StringableMap;
+import org.apache.hadoop.hive.common.ValidCompactorTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
@@ -61,12 +69,8 @@ import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.util.StringUtils;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.*;
-import java.util.regex.Matcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Class to do compactions via an MR job.  This has to be in the ql package 
rather than metastore
@@ -129,7 +133,7 @@ public class CompactorMR {
     job.setInt(NUM_BUCKETS, sd.getNumBuckets());
     job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
     overrideMRProps(job, t.getParameters()); // override MR properties from 
tblproperties if applicable
-    if (ci.properties != null) { // override MR properties and general 
tblproperties if applicable
+    if (ci.properties != null) {
       overrideTblProps(job, t.getParameters(), ci.properties);
     }
     setColumnTypes(job, sd.getCols());
@@ -137,6 +141,11 @@ public class CompactorMR {
     //to generate the target dir in the Map task, there is no easy way to pass 
it to OutputCommitter
     //to do the final move
     job.setBoolean("mapreduce.map.speculative", false);
+
+    // Set appropriate Acid readers/writers based on the table properties.
+    AcidUtils.setAcidOperationalProperties(job,
+            AcidUtils.getAcidOperationalProperties(t.getParameters()));
+
     return job;
   }
 
@@ -501,12 +510,18 @@ public class CompactorMR {
       Map<Integer, BucketTracker> splitToBucketMap = new HashMap<Integer, 
BucketTracker>();
       for (Path dir : dirsToSearch) {
         FileSystem fs = dir.getFileSystem(entries);
+        // When we have split-update and there are two kinds of delta 
directories-
+        // the delta_x_y/ directory one which has only insert events and
+        // the delete_delta_x_y/ directory which has only the delete events.
+        // The clever thing about this kind of splitting is that everything in 
the delta_x_y/
+        // directory can be processed as base files. However, this is left out 
currently
+        // as an improvement for the future.
 
-        // If this is a base or delta directory, then we need to be looking 
for the bucket files.
-        // But if it's a legacy file then we need to add it directly.
         if (dir.getName().startsWith(AcidUtils.BASE_PREFIX) ||
-            dir.getName().startsWith(AcidUtils.DELTA_PREFIX)) {
+            dir.getName().startsWith(AcidUtils.DELTA_PREFIX) ||
+            dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
           boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX);
+
           FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter);
           for(FileStatus f : files) {
             // For each file, figure out which bucket it is.
@@ -519,6 +534,8 @@ public class CompactorMR {
           addFileToMap(matcher, dir, true, splitToBucketMap);
         }
       }
+
+
       List<InputSplit> splits = new 
ArrayList<InputSplit>(splitToBucketMap.size());
       for (Map.Entry<Integer, BucketTracker> e : splitToBucketMap.entrySet()) {
         BucketTracker bt = e.getValue();
@@ -613,7 +630,8 @@ public class CompactorMR {
       implements Mapper<WritableComparable, CompactorInputSplit,  
NullWritable,  NullWritable> {
 
     JobConf jobConf;
-    RecordWriter writer;
+    RecordWriter writer = null;
+    RecordWriter deleteEventWriter = null;
 
     @Override
     public void map(WritableComparable key, CompactorInputSplit split,
@@ -636,10 +654,30 @@ public class CompactorMR {
       RecordIdentifier identifier = reader.createKey();
       V value = reader.createValue();
       getWriter(reporter, reader.getObjectInspector(), split.getBucket());
+
+      AcidUtils.AcidOperationalProperties acidOperationalProperties
+          = AcidUtils.getAcidOperationalProperties(jobConf);
+
+      if (!isMajor && acidOperationalProperties.isSplitUpdate()) {
+        // When split-update is enabled for ACID, we initialize a separate 
deleteEventWriter
+        // that is used to write all the delete events (in case of minor 
compaction only). For major
+        // compaction, history is not required to be maintained hence the 
delete events are processed
+        // but not re-written separately.
+        getDeleteEventWriter(reporter, reader.getObjectInspector(), 
split.getBucket());
+      }
+
       while (reader.next(identifier, value)) {
-        if (isMajor && reader.isDelete(value)) continue;
-        writer.write(value);
-        reporter.progress();
+        boolean sawDeleteRecord = reader.isDelete(value);
+        if (isMajor && sawDeleteRecord) continue;
+        if (sawDeleteRecord && deleteEventWriter != null) {
+          // When minor compacting, write delete events to a separate file 
when split-update is
+          // turned on.
+          deleteEventWriter.write(value);
+          reporter.progress();
+        } else {
+          writer.write(value);
+          reporter.progress();
+        }
       }
     }
 
@@ -653,6 +691,9 @@ public class CompactorMR {
       if (writer != null) {
         writer.close(false);
       }
+      if (deleteEventWriter != null) {
+        deleteEventWriter.close(false);
+      }
     }
 
     private void getWriter(Reporter reporter, ObjectInspector inspector,
@@ -679,6 +720,30 @@ public class CompactorMR {
       }
     }
 
+    private void getDeleteEventWriter(Reporter reporter, ObjectInspector 
inspector,
+        int bucket) throws IOException {
+      if (deleteEventWriter == null) {
+        AcidOutputFormat.Options options = new 
AcidOutputFormat.Options(jobConf);
+        options.inspector(inspector)
+          .writingBase(false)
+          .writingDeleteDelta(true)   // this is the option which will make it 
a delete writer
+          .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false))
+          .tableProperties(new 
StringableMap(jobConf.get(TABLE_PROPS)).toProperties())
+          .reporter(reporter)
+          .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
+          .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
+          .bucket(bucket)
+          .statementId(-1);//setting statementId == -1 makes compacted delta 
files use
+        //delta_xxxx_yyyy format
+
+        // Instantiate the underlying output format
+        @SuppressWarnings("unchecked")//since there is no way to parametrize 
instance of Class
+        AcidOutputFormat<WritableComparable, V> aof =
+          instantiate(AcidOutputFormat.class, 
jobConf.get(OUTPUT_FORMAT_CLASS_NAME));
+
+        deleteEventWriter = aof.getRawRecordWriter(new 
Path(jobConf.get(TMP_LOCATION)), options);
+      }
+    }
   }
 
   static class StringableList extends ArrayList<Path> {

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index af192fb..08ca9d5 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -70,18 +70,19 @@ import org.junit.rules.TestName;
  * specifically the tests; the supporting code here is just a clone of 
TestTxnCommands
  */
 public class TestTxnCommands2 {
-  private static final String TEST_DATA_DIR = new 
File(System.getProperty("java.io.tmpdir") +
+  protected static final String TEST_DATA_DIR = new 
File(System.getProperty("java.io.tmpdir") +
     File.separator + TestTxnCommands2.class.getCanonicalName()
     + "-" + System.currentTimeMillis()
   ).getPath().replaceAll("\\\\", "/");
-  private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + 
"/warehouse";
+  protected static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + 
"/warehouse";
   //bucket count for test tables; set it to 1 for easier debugging
-  private static int BUCKET_COUNT = 2;
+  protected static int BUCKET_COUNT = 2;
   @Rule
   public TestName testName = new TestName();
-  private HiveConf hiveConf;
-  private Driver d;
-  private static enum Table {
+
+  protected HiveConf hiveConf;
+  protected Driver d;
+  protected static enum Table {
     ACIDTBL("acidTbl"),
     ACIDTBLPART("acidTblPart"),
     NONACIDORCTBL("nonAcidOrcTbl"),
@@ -99,6 +100,10 @@ public class TestTxnCommands2 {
 
   @Before
   public void setUp() throws Exception {
+    setUpWithTableProperties("'transactional'='true'");
+  }
+
+  protected void setUpWithTableProperties(String tableProperties) throws 
Exception {
     tearDown();
     hiveConf = new HiveConf(this.getClass());
     hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
@@ -122,12 +127,13 @@ public class TestTxnCommands2 {
     SessionState.start(new SessionState(hiveConf));
     d = new Driver(hiveConf);
     dropTables();
-    runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) 
clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
-    runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) 
partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets 
stored as orc TBLPROPERTIES ('transactional'='true')");
+    runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) 
clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES 
(" + tableProperties + ")");
+    runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) 
partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets 
stored as orc TBLPROPERTIES (" + tableProperties + ")");
     runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b 
int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc 
TBLPROPERTIES ('transactional'='false')");
     runStatementOnDriver("create table " + Table.NONACIDPART + "(a int, b int) 
partitioned by (p string) stored as orc TBLPROPERTIES 
('transactional'='false')");
   }
-  private void dropTables() throws Exception {
+
+  protected void dropTables() throws Exception {
     for(Table t : Table.values()) {
       runStatementOnDriver("drop table if exists " + t);
     }
@@ -731,6 +737,8 @@ public class TestTxnCommands2 {
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
   }
 
+
+
   @Test
   public void testValidTxnsBookkeeping() throws Exception {
     // 1. Run a query against a non-ACID table, and we shouldn't have txn 
logged in conf
@@ -859,11 +867,15 @@ public class TestTxnCommands2 {
    */
   @Test
   public void testInitiatorWithMultipleFailedCompactions() throws Exception {
+    
testInitiatorWithMultipleFailedCompactionsForVariousTblProperties("'transactional'='true'");
+  }
+
+  void 
testInitiatorWithMultipleFailedCompactionsForVariousTblProperties(String 
tblProperties) throws Exception {
     String tblName = "hive12353";
     runStatementOnDriver("drop table if exists " + tblName);
     runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
       " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to 
be bucketed
-      " STORED AS ORC  TBLPROPERTIES ('transactional'='true')");
+      " STORED AS ORC  TBLPROPERTIES ( " + tblProperties + " )");
     hiveConf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 
4);
     for(int i = 0; i < 5; i++) {
       //generate enough delta files so that Initiator can trigger auto 
compaction
@@ -1074,11 +1086,15 @@ public class TestTxnCommands2 {
    */
   @Test
   public void writeBetweenWorkerAndCleaner() throws Exception {
+    
writeBetweenWorkerAndCleanerForVariousTblProperties("'transactional'='true'");
+  }
+
+  protected void writeBetweenWorkerAndCleanerForVariousTblProperties(String 
tblProperties) throws Exception {
     String tblName = "hive12352";
     runStatementOnDriver("drop table if exists " + tblName);
     runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
       " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to 
be bucketed
-      " STORED AS ORC  TBLPROPERTIES ('transactional'='true')");
+      " STORED AS ORC  TBLPROPERTIES ( " + tblProperties + " )");
 
     //create some data
     runStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 
'bar'),(3, 'baz')");
@@ -1125,7 +1141,6 @@ public class TestTxnCommands2 {
     Assert.assertEquals("", expected,
       runStatementOnDriver("select a,b from " + tblName + " order by a"));
   }
-
   /**
    * Simulate the scenario when a heartbeat failed due to client errors such 
as no locks or no txns being found.
    * When a heartbeat fails, the query should be failed too.
@@ -1215,17 +1230,78 @@ public class TestTxnCommands2 {
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
     runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + 
makeValuesClause(tableData));
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
-    
+
     runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
     runWorker(hiveConf);
     runCleaner(hiveConf);
     runStatementOnDriver("select count(*) from " + Table.ACIDTBL);
   }
+
+  @Test
+  public void testACIDwithSchemaEvolutionAndCompaction() throws Exception {
+    
testACIDwithSchemaEvolutionForVariousTblProperties("'transactional'='true'");
+  }
+
+  protected void testACIDwithSchemaEvolutionForVariousTblProperties(String 
tblProperties) throws Exception {
+    String tblName = "acidWithSchemaEvol";
+    int numBuckets = 1;
+    runStatementOnDriver("drop table if exists " + tblName);
+    runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+      " CLUSTERED BY(a) INTO " + numBuckets +" BUCKETS" + //currently ACID 
requires table to be bucketed
+      " STORED AS ORC  TBLPROPERTIES ( " + tblProperties + " )");
+
+    // create some data
+    runStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 
'bar'),(3, 'baz')");
+    runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3");
+
+    // apply schema evolution by adding some columns
+    runStatementOnDriver("alter table " + tblName + " add columns(c int, d 
string)");
+
+    // insert some data in new schema
+    runStatementOnDriver("insert into " + tblName + " values(4, 'acid', 100, 
'orc'),"
+        + "(5, 'llap', 200, 'tez')");
+
+    // update old data with values for the new schema columns
+    runStatementOnDriver("update " + tblName + " set d = 'hive' where a <= 3");
+    runStatementOnDriver("update " + tblName + " set c = 999 where a <= 3");
+
+    // read the entire data back and see if did everything right
+    List<String> rs = runStatementOnDriver("select * from " + tblName + " 
order by a");
+    String[] expectedResult = { "1\tfoo\t999\thive", "2\tbar\t999\thive", 
"3\tblah\t999\thive", "4\tacid\t100\torc", "5\tllap\t200\ttez" };
+    Assert.assertEquals(Arrays.asList(expectedResult), rs);
+
+    // now compact and see if compaction still preserves the data correctness
+    runStatementOnDriver("alter table "+ tblName + " compact 'MAJOR'");
+    runWorker(hiveConf);
+    runCleaner(hiveConf); // Cleaner would remove the obsolete files.
+
+    // Verify that there is now only 1 new directory: base_xxxxxxx and the 
rest have have been cleaned.
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] status;
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + 
tblName.toString().toLowerCase()),
+        FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(1, status.length);
+    boolean sawNewBase = false;
+    for (int i = 0; i < status.length; i++) {
+      if (status[i].getPath().getName().matches("base_.*")) {
+        sawNewBase = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), 
FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(numBuckets, buckets.length);
+        
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00000"));
+      }
+    }
+    Assert.assertTrue(sawNewBase);
+
+    rs = runStatementOnDriver("select * from " + tblName + " order by a");
+    Assert.assertEquals(Arrays.asList(expectedResult), rs);
+  }
+
+
   /**
    * takes raw data and turns it into a string as if from Driver.getResults()
    * sorts rows in dictionary order
    */
-  private List<String> stringifyValues(int[][] rowsIn) {
+  protected List<String> stringifyValues(int[][] rowsIn) {
     assert rowsIn.length > 0;
     int[][] rows = rowsIn.clone();
     Arrays.sort(rows, new RowComp());
@@ -1275,7 +1351,7 @@ public class TestTxnCommands2 {
     return sb.toString();
   }
 
-  private List<String> runStatementOnDriver(String stmt) throws Exception {
+  protected List<String> runStatementOnDriver(String stmt) throws Exception {
     CommandProcessorResponse cpr = d.run(stmt);
     if(cpr.getResponseCode() != 0) {
       throw new RuntimeException(stmt + " failed: " + cpr);

Reply via email to