http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 334cb31..6261a14 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -80,7 +80,10 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils.AcidBaseFileInfo; +import org.apache.hadoop.hive.ql.io.AcidUtils.AcidOperationalProperties; import org.apache.hadoop.hive.ql.io.AcidUtils.Directory; +import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat; import org.apache.hadoop.hive.ql.io.BatchToRowReader; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; @@ -525,6 +528,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, private final ValidTxnList transactionList; private SplitStrategyKind splitStrategyKind; private final SearchArgument sarg; + private final AcidOperationalProperties acidOperationalProperties; Context(Configuration conf) throws IOException { this(conf, 1, null); @@ -606,6 +610,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } String value = conf.get(ValidTxnList.VALID_TXNS_KEY); transactionList = value == null ? new ValidReadTxnList() : new ValidReadTxnList(value); + + // Determine the transactional_properties of the table from the job conf stored in context. + // The table properties are copied to job conf at HiveInputFormat::addSplitsForGroup(), + // & therefore we should be able to retrieve them here and determine appropriate behavior. + // Note that this will be meaningless for non-acid tables & will be set to null. + boolean isTableTransactional = conf.getBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, false); + String transactionalProperties = conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + this.acidOperationalProperties = isTableTransactional ? + AcidOperationalProperties.parseString(transactionalProperties) : null; } @VisibleForTesting @@ -639,17 +652,20 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, @VisibleForTesting static final class AcidDirInfo { public AcidDirInfo(FileSystem fs, Path splitPath, Directory acidInfo, - List<HdfsFileStatusWithId> baseOrOriginalFiles) { + List<AcidBaseFileInfo> baseFiles, + List<ParsedDelta> parsedDeltas) { this.splitPath = splitPath; this.acidInfo = acidInfo; - this.baseOrOriginalFiles = baseOrOriginalFiles; + this.baseFiles = baseFiles; this.fs = fs; + this.parsedDeltas = parsedDeltas; } final FileSystem fs; final Path splitPath; final AcidUtils.Directory acidInfo; - final List<HdfsFileStatusWithId> baseOrOriginalFiles; + final List<AcidBaseFileInfo> baseFiles; + final List<ParsedDelta> parsedDeltas; } @VisibleForTesting @@ -672,7 +688,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, SplitInfo(Context context, FileSystem fs, HdfsFileStatusWithId fileWithId, OrcTail orcTail, List<OrcProto.Type> readerTypes, boolean isOriginal, List<DeltaMetaData> deltas, boolean hasBase, Path dir, boolean[] covered, ByteBuffer ppdResult) throws IOException { - super(dir, context.numBuckets, deltas, covered); + super(dir, context.numBuckets, deltas, covered, context.acidOperationalProperties); this.context = context; this.fs = fs; this.fileWithId = fileWithId; @@ -916,7 +932,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, public BISplitStrategy(Context context, FileSystem fs, Path dir, List<HdfsFileStatusWithId> fileStatuses, boolean isOriginal, List<DeltaMetaData> deltas, boolean[] covered, boolean allowSyntheticFileIds) { - super(dir, context.numBuckets, deltas, covered); + super(dir, context.numBuckets, deltas, covered, context.acidOperationalProperties); this.fileStatuses = fileStatuses; this.isOriginal = isOriginal; this.deltas = deltas; @@ -964,20 +980,33 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, List<DeltaMetaData> deltas; boolean[] covered; int numBuckets; + AcidOperationalProperties acidOperationalProperties; - public ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered) { + public ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered, + AcidOperationalProperties acidOperationalProperties) { this.dir = dir; this.numBuckets = numBuckets; this.deltas = deltas; this.covered = covered; + this.acidOperationalProperties = acidOperationalProperties; } @Override public List<OrcSplit> getSplits() throws IOException { + List<OrcSplit> splits = Lists.newArrayList(); + + // When split-update is enabled, we do not need to account for buckets that aren't covered. + // This is a huge performance benefit of split-update. And the reason why we are able to + // do so is because the 'deltas' here are actually only the delete_deltas. All the insert_deltas + // with valid user payload data has already been considered as base for the covered buckets. + // Hence, the uncovered buckets do not have any relevant data and we can just ignore them. + if (acidOperationalProperties != null && acidOperationalProperties.isSplitUpdate()) { + return splits; // return an empty list. + } + // Generate a split for any buckets that weren't covered. // This happens in the case where a bucket just has deltas and no // base. - List<OrcSplit> splits = Lists.newArrayList(); if (!deltas.isEmpty()) { for (int b = 0; b < numBuckets; ++b) { if (!covered[b]) { @@ -1032,13 +1061,70 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } private AcidDirInfo callInternal() throws IOException { - AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, - context.conf, context.transactionList, useFileIds, true); + AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf, + context.transactionList, useFileIds, true); Path base = dirInfo.getBaseDirectory(); // find the base files (original or new style) - List<HdfsFileStatusWithId> children = (base == null) - ? dirInfo.getOriginalFiles() : findBaseFiles(base, useFileIds); - return new AcidDirInfo(fs, dir, dirInfo, children); + List<AcidBaseFileInfo> baseFiles = new ArrayList<AcidBaseFileInfo>(); + if (base == null) { + for (HdfsFileStatusWithId fileId : dirInfo.getOriginalFiles()) { + baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE)); + } + } else { + List<HdfsFileStatusWithId> compactedBaseFiles = findBaseFiles(base, useFileIds); + for (HdfsFileStatusWithId fileId : compactedBaseFiles) { + baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.COMPACTED_BASE)); + } + } + + // Find the parsed deltas- some of them containing only the insert delta events + // may get treated as base if split-update is enabled for ACID. (See HIVE-14035 for details) + List<ParsedDelta> parsedDeltas = new ArrayList<ParsedDelta>(); + + if (context.acidOperationalProperties != null && + context.acidOperationalProperties.isSplitUpdate()) { + // If we have split-update turned on for this table, then the delta events have already been + // split into two directories- delta_x_y/ and delete_delta_x_y/. + // When you have split-update turned on, the insert events go to delta_x_y/ directory and all + // the delete events go to delete_x_y/. An update event will generate two events- + // a delete event for the old record that is put into delete_delta_x_y/, + // followed by an insert event for the updated record put into the usual delta_x_y/. + // Therefore, everything inside delta_x_y/ is an insert event and all the files in delta_x_y/ + // can be treated like base files. Hence, each of these are added to baseOrOriginalFiles list. + + for (ParsedDelta parsedDelta : dirInfo.getCurrentDirectories()) { + if (parsedDelta.isDeleteDelta()) { + parsedDeltas.add(parsedDelta); + } else { + // This is a normal insert delta, which only has insert events and hence all the files + // in this delta directory can be considered as a base. + if (useFileIds) { + try { + List<HdfsFileStatusWithId> insertDeltaFiles = + SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.hiddenFileFilter); + for (HdfsFileStatusWithId fileId : insertDeltaFiles) { + baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA)); + } + continue; // move on to process to the next parsedDelta. + } catch (Throwable t) { + LOG.error("Failed to get files with ID; using regular API: " + t.getMessage()); + } + } + // Fall back to regular API and create statuses without ID. + List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), AcidUtils.hiddenFileFilter); + for (FileStatus child : children) { + HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, child); + baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA)); + } + } + } + + } else { + // When split-update is not enabled, then all the deltas in the current directories + // should be considered as usual. + parsedDeltas.addAll(dirInfo.getCurrentDirectories()); + } + return new AcidDirInfo(fs, dir, dirInfo, baseFiles, parsedDeltas); } private List<HdfsFileStatusWithId> findBaseFiles( @@ -1526,26 +1612,32 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, continue; } - // We have received a new directory information, make a split strategy. + // We have received a new directory information, make split strategies. --resultsLeft; - SplitStrategy<?> splitStrategy = determineSplitStrategy(combinedCtx, context, adi.fs, - adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, readerTypes, ugi, + + // The reason why we can get a list of split strategies here is because for ACID split-update + // case when we have a mix of original base files & insert deltas, we will produce two + // independent split strategies for them. There is a global flag 'isOriginal' that is set + // on a per split strategy basis and it has to be same for all the files in that strategy. + List<SplitStrategy<?>> splitStrategies = determineSplitStrategies(combinedCtx, context, adi.fs, + adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas, readerTypes, ugi, allowSyntheticFileIds); - if (splitStrategy == null) continue; // Combined. - if (isDebugEnabled) { - LOG.debug("Split strategy: {}", splitStrategy); - } + for (SplitStrategy<?> splitStrategy : splitStrategies) { + if (isDebugEnabled) { + LOG.debug("Split strategy: {}", splitStrategy); + } - // Hack note - different split strategies return differently typed lists, yay Java. - // This works purely by magic, because we know which strategy produces which type. - if (splitStrategy instanceof ETLSplitStrategy) { - scheduleSplits((ETLSplitStrategy)splitStrategy, - context, splitFutures, strategyFutures, splits); - } else { - @SuppressWarnings("unchecked") - List<OrcSplit> readySplits = (List<OrcSplit>)splitStrategy.getSplits(); - splits.addAll(readySplits); + // Hack note - different split strategies return differently typed lists, yay Java. + // This works purely by magic, because we know which strategy produces which type. + if (splitStrategy instanceof ETLSplitStrategy) { + scheduleSplits((ETLSplitStrategy)splitStrategy, + context, splitFutures, strategyFutures, splits); + } else { + @SuppressWarnings("unchecked") + List<OrcSplit> readySplits = (List<OrcSplit>)splitStrategy.getSplits(); + splits.addAll(readySplits); + } } } @@ -1763,6 +1855,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, final OrcSplit split = (OrcSplit) inputSplit; final Path path = split.getPath(); + Path root; if (split.hasBase()) { if (split.isOriginal()) { @@ -1773,7 +1866,20 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } else { root = path; } - final Path[] deltas = AcidUtils.deserializeDeltas(root, split.getDeltas()); + + // Retrieve the acidOperationalProperties for the table, initialized in HiveInputFormat. + AcidUtils.AcidOperationalProperties acidOperationalProperties + = AcidUtils.getAcidOperationalProperties(options.getConfiguration()); + + // The deltas are decided based on whether split-update has been turned on for the table or not. + // When split-update is turned off, everything in the delta_x_y/ directory should be treated + // as delta. However if split-update is turned on, only the files in delete_delta_x_y/ directory + // need to be considered as delta, because files in delta_x_y/ will be processed as base files + // since they only have insert events in them. + final Path[] deltas = + acidOperationalProperties.isSplitUpdate() ? + AcidUtils.deserializeDeleteDeltas(root, split.getDeltas()) + : AcidUtils.deserializeDeltas(root, split.getDeltas()); final Configuration conf = options.getConfiguration(); @@ -1793,7 +1899,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, setSearchArgument(readOptions, schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL); if (split.hasBase()) { - bucket = AcidUtils.parseBaseBucketFilename(split.getPath(), conf) + bucket = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf) .getBucket(); OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf) .maxLength(split.getFileLength()); @@ -1948,23 +2054,76 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } @VisibleForTesting + static List<SplitStrategy<?>> determineSplitStrategies(CombinedCtx combinedCtx, Context context, + FileSystem fs, Path dir, AcidUtils.Directory dirInfo, + List<AcidBaseFileInfo> baseFiles, + List<ParsedDelta> parsedDeltas, + List<OrcProto.Type> readerTypes, + UserGroupInformation ugi, boolean allowSyntheticFileIds) { + List<SplitStrategy<?>> splitStrategies = new ArrayList<SplitStrategy<?>>(); + SplitStrategy<?> splitStrategy; + + // When no baseFiles, we will just generate a single split strategy and return. + List<HdfsFileStatusWithId> acidSchemaFiles = new ArrayList<HdfsFileStatusWithId>(); + if (baseFiles.isEmpty()) { + splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo, + acidSchemaFiles, false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds); + if (splitStrategy != null) { + splitStrategies.add(splitStrategy); + } + return splitStrategies; // return here + } + + List<HdfsFileStatusWithId> originalSchemaFiles = new ArrayList<HdfsFileStatusWithId>(); + // Separate the base files into acid schema and non-acid(original) schema files. + for (AcidBaseFileInfo acidBaseFileInfo : baseFiles) { + if (acidBaseFileInfo.isOriginal()) { + originalSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId()); + } else { + acidSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId()); + } + } + + // Generate split strategy for non-acid schema original files, if any. + if (!originalSchemaFiles.isEmpty()) { + splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo, + originalSchemaFiles, true, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds); + if (splitStrategy != null) { + splitStrategies.add(splitStrategy); + } + } + + // Generate split strategy for acid schema files, if any. + if (!acidSchemaFiles.isEmpty()) { + splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo, + acidSchemaFiles, false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds); + if (splitStrategy != null) { + splitStrategies.add(splitStrategy); + } + } + + return splitStrategies; + } + + @VisibleForTesting static SplitStrategy<?> determineSplitStrategy(CombinedCtx combinedCtx, Context context, FileSystem fs, Path dir, AcidUtils.Directory dirInfo, - List<HdfsFileStatusWithId> baseOrOriginalFiles, List<OrcProto.Type> readerTypes, + List<HdfsFileStatusWithId> baseFiles, + boolean isOriginal, + List<ParsedDelta> parsedDeltas, + List<OrcProto.Type> readerTypes, UserGroupInformation ugi, boolean allowSyntheticFileIds) { - Path base = dirInfo.getBaseDirectory(); - List<HdfsFileStatusWithId> original = dirInfo.getOriginalFiles(); - List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories()); + List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(parsedDeltas); boolean[] covered = new boolean[context.numBuckets]; - boolean isOriginal = base == null; // if we have a base to work from - if (base != null || !original.isEmpty()) { + if (!baseFiles.isEmpty()) { long totalFileSize = 0; - for (HdfsFileStatusWithId child : baseOrOriginalFiles) { + for (HdfsFileStatusWithId child : baseFiles) { totalFileSize += child.getFileStatus().getLen(); - AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename + AcidOutputFormat.Options opts = AcidUtils.parseBaseOrDeltaBucketFilename (child.getFileStatus().getPath(), context.conf); + opts.writingBase(true); int b = opts.getBucket(); // If the bucket is in the valid range, mark it as covered. // I wish Hive actually enforced bucketing all of the time. @@ -1973,31 +2132,32 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } } - int numFiles = baseOrOriginalFiles.size(); + int numFiles = baseFiles.size(); long avgFileSize = totalFileSize / numFiles; int totalFiles = context.numFilesCounter.addAndGet(numFiles); switch(context.splitStrategyKind) { case BI: // BI strategy requested through config - return new BISplitStrategy(context, fs, dir, baseOrOriginalFiles, + return new BISplitStrategy(context, fs, dir, baseFiles, isOriginal, deltas, covered, allowSyntheticFileIds); case ETL: // ETL strategy requested through config - return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, baseOrOriginalFiles, + return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, baseFiles, deltas, covered, readerTypes, isOriginal, ugi, allowSyntheticFileIds); default: // HYBRID strategy if (avgFileSize > context.maxSize || totalFiles <= context.etlFileThreshold) { - return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, baseOrOriginalFiles, + return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, baseFiles, deltas, covered, readerTypes, isOriginal, ugi, allowSyntheticFileIds); } else { - return new BISplitStrategy(context, fs, dir, baseOrOriginalFiles, + return new BISplitStrategy(context, fs, dir, baseFiles, isOriginal, deltas, covered, allowSyntheticFileIds); } } } else { // no base, only deltas - return new ACIDSplitStrategy(dir, context.numBuckets, deltas, covered); + return new ACIDSplitStrategy(dir, context.numBuckets, deltas, covered, + context.acidOperationalProperties); } }
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 1a1af28..492c64c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -25,11 +25,6 @@ import java.nio.charset.CharsetDecoder; import java.util.ArrayList; import java.util.List; -import org.apache.orc.impl.AcidStats; -import org.apache.orc.impl.OrcAcidUtils; -import org.apache.orc.OrcConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -42,10 +37,16 @@ import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.orc.OrcConf; +import org.apache.orc.impl.AcidStats; +import org.apache.orc.impl.OrcAcidUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; @@ -79,9 +80,13 @@ public class OrcRecordUpdater implements RecordUpdater { private static final Charset UTF8 = Charset.forName("UTF-8"); private final AcidOutputFormat.Options options; + private final AcidUtils.AcidOperationalProperties acidOperationalProperties; private final Path path; + private Path deleteEventPath; private final FileSystem fs; + private OrcFile.WriterOptions writerOptions; private Writer writer; + private Writer deleteEventWriter = null; private final FSDataOutputStream flushLengths; private final OrcStruct item; private final IntWritable operation = new IntWritable(); @@ -95,9 +100,11 @@ public class OrcRecordUpdater implements RecordUpdater { // because that is monotonically increasing to give new unique row ids. private long rowCountDelta = 0; private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder(); + private KeyIndexBuilder deleteEventIndexBuilder; private StructField recIdField = null; // field to look for the record identifier in private StructField rowIdField = null; // field inside recId to look for row id in private StructField originalTxnField = null; // field inside recId to look for original txn in + private StructField bucketField = null; // field inside recId to look for bucket in private StructObjectInspector rowInspector; // OI for the original row private StructObjectInspector recIdInspector; // OI for the record identifier struct private LongObjectInspector rowIdInspector; // OI for the long row id inside the recordIdentifier @@ -180,8 +187,22 @@ public class OrcRecordUpdater implements RecordUpdater { OrcRecordUpdater(Path path, AcidOutputFormat.Options options) throws IOException { this.options = options; + // Initialize acidOperationalProperties based on table properties, and + // if they are not available, see if we can find it in the job configuration. + // We have to look at these two places instead of just the conf, because Streaming Ingest + // uses table properties, while normal Hive SQL inserts/updates/deletes will place this + // value in the configuration object. + if (options.getTableProperties() != null) { + this.acidOperationalProperties = + AcidUtils.getAcidOperationalProperties(options.getTableProperties()); + } else { + this.acidOperationalProperties = + AcidUtils.getAcidOperationalProperties(options.getConfiguration()); + } this.bucket.set(options.getBucket()); this.path = AcidUtils.createFilename(path, options); + this.deleteEventWriter = null; + this.deleteEventPath = null; FileSystem fs = options.getFilesystem(); if (fs == null) { fs = path.getFileSystem(options.getConfiguration()); @@ -205,7 +226,7 @@ public class OrcRecordUpdater implements RecordUpdater { } else { flushLengths = null; } - OrcFile.WriterOptions writerOptions = null; + this.writerOptions = null; // If writing delta dirs, we need to make a clone of original options, to avoid polluting it for // the base writer if (options.isWritingBase()) { @@ -242,6 +263,13 @@ public class OrcRecordUpdater implements RecordUpdater { writerOptions.inspector(createEventSchema(findRecId(options.getInspector(), options.getRecordIdColumn()))); this.writer = OrcFile.createWriter(this.path, writerOptions); + if (this.acidOperationalProperties.isSplitUpdate()) { + // If this is a split-update, we initialize a delete delta file path in anticipation that + // they would write update/delete events to that separate file. + // This writes to a file in directory which starts with "delete_delta_..." + // The actual initialization of a writer only happens if any delete events are written. + this.deleteEventPath = AcidUtils.createFilename(path, options.writingDeleteDelta(true)); + } item = new OrcStruct(FIELDS); item.setFieldValue(OPERATION, operation); item.setFieldValue(CURRENT_TRANSACTION, currentTransaction); @@ -250,6 +278,7 @@ public class OrcRecordUpdater implements RecordUpdater { item.setFieldValue(ROW_ID, rowId); } + @Override public String toString() { return getClass().getName() + "[" + path +"]"; } @@ -264,14 +293,16 @@ public class OrcRecordUpdater implements RecordUpdater { * 1. need to know bucket we are writing to * 2. need to know which delta dir it's in * Then, - * 1. find the same bucket file in previous delta dir for this txn + * 1. find the same bucket file in previous (insert) delta dir for this txn + * (Note: in case of split_update, we can ignore the delete_delta dirs) * 2. read the footer and get AcidStats which has insert count - * 2.1 if AcidStats.inserts>0 done + * 2.1 if AcidStats.inserts>0 add to the insert count. * else go to previous delta file * For example, consider insert/update/insert case...*/ if(options.getStatementId() <= 0) { return 0;//there is only 1 statement in this transaction (so far) } + long totalInserts = 0; for(int pastStmt = options.getStatementId() - 1; pastStmt >= 0; pastStmt--) { Path matchingBucket = AcidUtils.createFilename(options.getFinalDestination(), options.clone().statementId(pastStmt)); if(!fs.exists(matchingBucket)) { @@ -281,12 +312,10 @@ public class OrcRecordUpdater implements RecordUpdater { //no close() on Reader?! AcidStats acidStats = OrcAcidUtils.parseAcidStats(reader); if(acidStats.inserts > 0) { - return acidStats.inserts; + totalInserts += acidStats.inserts; } } - //if we got here, we looked at all delta files in this txn, prior to current statement and didn't - //find any inserts... - return 0; + return totalInserts; } // Find the record identifier column (if there) and return a possibly new ObjectInspector that // will strain out the record id for the underlying writer. @@ -307,6 +336,7 @@ public class OrcRecordUpdater implements RecordUpdater { // in RecordIdentifier is transactionId, bucketId, rowId originalTxnField = fields.get(0); origTxnInspector = (LongObjectInspector)originalTxnField.getFieldObjectInspector(); + bucketField = fields.get(1); rowIdField = fields.get(2); rowIdInspector = (LongObjectInspector)rowIdField.getFieldObjectInspector(); @@ -316,7 +346,7 @@ public class OrcRecordUpdater implements RecordUpdater { } } - private void addEvent(int operation, long currentTransaction, long rowId, Object row) + private void addSimpleEvent(int operation, long currentTransaction, long rowId, Object row) throws IOException { this.operation.set(operation); this.currentTransaction.set(currentTransaction); @@ -334,11 +364,60 @@ public class OrcRecordUpdater implements RecordUpdater { } this.rowId.set(rowId); this.originalTransaction.set(originalTransaction); + item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(operation)); item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row)); indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId); writer.addRow(item); } + private void addSplitUpdateEvent(int operation, long currentTransaction, long rowId, Object row) + throws IOException { + if (operation == INSERT_OPERATION) { + // Just insert the record in the usual way, i.e., default to the simple behavior. + addSimpleEvent(operation, currentTransaction, rowId, row); + return; + } + this.operation.set(operation); + this.currentTransaction.set(currentTransaction); + Object rowValue = rowInspector.getStructFieldData(row, recIdField); + long originalTransaction = origTxnInspector.get( + recIdInspector.getStructFieldData(rowValue, originalTxnField)); + rowId = rowIdInspector.get( + recIdInspector.getStructFieldData(rowValue, rowIdField)); + + if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) { + // Initialize a deleteEventWriter if not yet done. (Lazy initialization) + if (deleteEventWriter == null) { + // Initialize an indexBuilder for deleteEvents. + deleteEventIndexBuilder = new KeyIndexBuilder(); + // Change the indexBuilder callback too for the deleteEvent file, the remaining writer + // options remain the same. + + // TODO: When we change the callback, we are essentially mutating the writerOptions. + // This works but perhaps is not a good thing. The proper way to do this would be + // to clone the writerOptions, however it requires that the parent OrcFile.writerOptions + // implements a clone() method (which it does not for now). HIVE-14514 is currently an open + // JIRA to fix this. + + this.deleteEventWriter = OrcFile.createWriter(deleteEventPath, + writerOptions.callback(deleteEventIndexBuilder)); + } + + // A delete/update generates a delete event for the original row. + this.rowId.set(rowId); + this.originalTransaction.set(originalTransaction); + item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(DELETE_OPERATION)); + item.setFieldValue(OrcRecordUpdater.ROW, null); // ROW is null for delete events. + deleteEventIndexBuilder.addKey(DELETE_OPERATION, originalTransaction, bucket.get(), rowId); + deleteEventWriter.addRow(item); + } + + if (operation == UPDATE_OPERATION) { + // A new row is also inserted in the usual delta file for an update event. + addSimpleEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + } + } + @Override public void insert(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { @@ -347,7 +426,11 @@ public class OrcRecordUpdater implements RecordUpdater { //always true in that case rowIdOffset = findRowIdOffsetForInsert(); } - addEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + if (acidOperationalProperties.isSplitUpdate()) { + addSplitUpdateEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + } else { + addSimpleEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + } rowCountDelta++; } @@ -355,8 +438,13 @@ public class OrcRecordUpdater implements RecordUpdater { public void update(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; + rowIdOffset = findRowIdOffsetForInsert(); + } + if (acidOperationalProperties.isSplitUpdate()) { + addSplitUpdateEvent(UPDATE_OPERATION, currentTransaction, -1L, row); + } else { + addSimpleEvent(UPDATE_OPERATION, currentTransaction, -1L, row); } - addEvent(UPDATE_OPERATION, currentTransaction, -1L, row); } @Override @@ -364,9 +452,12 @@ public class OrcRecordUpdater implements RecordUpdater { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; } - addEvent(DELETE_OPERATION, currentTransaction, -1, row); + if (acidOperationalProperties.isSplitUpdate()) { + addSplitUpdateEvent(DELETE_OPERATION, currentTransaction, -1L, row); + } else { + addSimpleEvent(DELETE_OPERATION, currentTransaction, -1L, row); + } rowCountDelta--; - } @Override @@ -390,13 +481,38 @@ public class OrcRecordUpdater implements RecordUpdater { fs.delete(path, false); } } else { - if (writer != null) writer.close(); + if (writer != null) { + if (acidOperationalProperties.isSplitUpdate()) { + // When split-update is enabled, we can choose not to write + // any delta files when there are no inserts. In such cases only the delete_deltas + // would be written & they are closed separately below. + if (indexBuilder.acidStats.inserts > 0) { + writer.close(); // normal close, when there are inserts. + } else { + // Just remove insert delta paths, when there are no insert events. + fs.delete(path, false); + } + } else { + writer.close(); // normal close. + } + } + if (deleteEventWriter != null) { + if (deleteEventIndexBuilder.acidStats.deletes > 0) { + // Only need to write out & close the delete_delta if there have been any. + deleteEventWriter.close(); + } else { + // Just remove delete_delta, if there have been no delete events. + fs.delete(deleteEventPath, false); + } + } + } if (flushLengths != null) { flushLengths.close(); fs.delete(OrcAcidUtils.getSideFile(path), false); } writer = null; + deleteEventWriter = null; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 8cb5e8a..5f53aef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -61,6 +61,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.ObjectPair; @@ -3167,10 +3168,11 @@ private void constructOneLBLocationMap(FileStatus fSta, private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, List<Path> newFiles) throws HiveException { - // The layout for ACID files is table|partname/base|delta/bucket + // The layout for ACID files is table|partname/base|delta|delete_delta/bucket // We will always only be writing delta files. In the buckets created by FileSinkOperator - // it will look like bucket/delta/bucket. So we need to move that into the above structure. - // For the first mover there will be no delta directory, so we can move the whole directory. + // it will look like bucket/delta|delete_delta/bucket. So we need to move that into + // the above structure. For the first mover there will be no delta directory, + // so we can move the whole directory. // For everyone else we will need to just move the buckets under the existing delta // directory. @@ -3193,49 +3195,58 @@ private void constructOneLBLocationMap(FileStatus fSta, for (FileStatus origBucketStat : origBucketStats) { Path origBucketPath = origBucketStat.getPath(); - LOG.debug("Acid move looking for delta files in bucket " + origBucketPath); + moveAcidDeltaFiles(AcidUtils.DELTA_PREFIX, AcidUtils.deltaFileFilter, + fs, dst, origBucketPath, createdDeltaDirs, newFiles); + moveAcidDeltaFiles(AcidUtils.DELETE_DELTA_PREFIX, AcidUtils.deleteEventDeltaDirFilter, + fs, dst,origBucketPath, createdDeltaDirs, newFiles); + } + } + } - FileStatus[] deltaStats = null; - try { - deltaStats = fs.listStatus(origBucketPath, AcidUtils.deltaFileFilter); - } catch (IOException e) { - throw new HiveException("Unable to look for delta files in original bucket " + - origBucketPath.toUri().toString(), e); - } - LOG.debug("Acid move found " + deltaStats.length + " delta files"); - - for (FileStatus deltaStat : deltaStats) { - Path deltaPath = deltaStat.getPath(); - // Create the delta directory. Don't worry if it already exists, - // as that likely means another task got to it first. Then move each of the buckets. - // it would be more efficient to try to move the delta with it's buckets but that is - // harder to make race condition proof. - Path deltaDest = new Path(dst, deltaPath.getName()); + private static void moveAcidDeltaFiles(String deltaFileType, PathFilter pathFilter, FileSystem fs, + Path dst, Path origBucketPath, Set<Path> createdDeltaDirs, + List<Path> newFiles) throws HiveException { + LOG.debug("Acid move looking for " + deltaFileType + " files in bucket " + origBucketPath); + + FileStatus[] deltaStats = null; + try { + deltaStats = fs.listStatus(origBucketPath, pathFilter); + } catch (IOException e) { + throw new HiveException("Unable to look for " + deltaFileType + " files in original bucket " + + origBucketPath.toUri().toString(), e); + } + LOG.debug("Acid move found " + deltaStats.length + " " + deltaFileType + " files"); + + for (FileStatus deltaStat : deltaStats) { + Path deltaPath = deltaStat.getPath(); + // Create the delta directory. Don't worry if it already exists, + // as that likely means another task got to it first. Then move each of the buckets. + // it would be more efficient to try to move the delta with it's buckets but that is + // harder to make race condition proof. + Path deltaDest = new Path(dst, deltaPath.getName()); + try { + if (!createdDeltaDirs.contains(deltaDest)) { try { - if (!createdDeltaDirs.contains(deltaDest)) { - try { - fs.mkdirs(deltaDest); - createdDeltaDirs.add(deltaDest); - } catch (IOException swallowIt) { - // Don't worry about this, as it likely just means it's already been created. - LOG.info("Unable to create delta directory " + deltaDest + - ", assuming it already exists: " + swallowIt.getMessage()); - } - } - FileStatus[] bucketStats = fs.listStatus(deltaPath, AcidUtils.bucketFileFilter); - LOG.debug("Acid move found " + bucketStats.length + " bucket files"); - for (FileStatus bucketStat : bucketStats) { - Path bucketSrc = bucketStat.getPath(); - Path bucketDest = new Path(deltaDest, bucketSrc.getName()); - LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " + - bucketDest.toUri().toString()); - fs.rename(bucketSrc, bucketDest); - if (newFiles != null) newFiles.add(bucketDest); - } - } catch (IOException e) { - throw new HiveException("Error moving acid files " + e.getMessage(), e); + fs.mkdirs(deltaDest); + createdDeltaDirs.add(deltaDest); + } catch (IOException swallowIt) { + // Don't worry about this, as it likely just means it's already been created. + LOG.info("Unable to create " + deltaFileType + " directory " + deltaDest + + ", assuming it already exists: " + swallowIt.getMessage()); } } + FileStatus[] bucketStats = fs.listStatus(deltaPath, AcidUtils.bucketFileFilter); + LOG.debug("Acid move found " + bucketStats.length + " bucket files"); + for (FileStatus bucketStat : bucketStats) { + Path bucketSrc = bucketStat.getPath(); + Path bucketDest = new Path(deltaDest, bucketSrc.getName()); + LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " + + bucketDest.toUri().toString()); + fs.rename(bucketSrc, bucketDest); + if (newFiles != null) newFiles.add(bucketDest); + } + } catch (IOException e) { + throw new HiveException("Error moving acid files " + e.getMessage(), e); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index 8cf261d..47c65bb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -101,6 +101,8 @@ public class TableScanDesc extends AbstractOperatorDesc { private boolean isAcidTable; + private AcidUtils.AcidOperationalProperties acidOperationalProperties = null; + private transient TableSample tableSample; private transient Table tableMetadata; @@ -127,6 +129,9 @@ public class TableScanDesc extends AbstractOperatorDesc { this.virtualCols = vcs; this.tableMetadata = tblMetadata; isAcidTable = AcidUtils.isAcidTable(this.tableMetadata); + if (isAcidTable) { + acidOperationalProperties = AcidUtils.getAcidOperationalProperties(this.tableMetadata); + } } @Override @@ -159,6 +164,10 @@ public class TableScanDesc extends AbstractOperatorDesc { return isAcidTable; } + public AcidUtils.AcidOperationalProperties getAcidOperationalProperties() { + return acidOperationalProperties; + } + @Explain(displayName = "Output", explainLevels = { Level.USER }) public List<String> getOutputColumnNames() { return this.neededColumns; http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 6caca98..c3e3982 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -17,9 +17,16 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import org.apache.hadoop.hive.common.ValidCompactorTxnList; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.regex.Matcher; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; @@ -27,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.StringableMap; +import org.apache.hadoop.hive.common.ValidCompactorTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.CompactionType; @@ -61,12 +69,8 @@ import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.lib.NullOutputFormat; import org.apache.hadoop.util.StringUtils; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.*; -import java.util.regex.Matcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class to do compactions via an MR job. This has to be in the ql package rather than metastore @@ -129,7 +133,7 @@ public class CompactorMR { job.setInt(NUM_BUCKETS, sd.getNumBuckets()); job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); overrideMRProps(job, t.getParameters()); // override MR properties from tblproperties if applicable - if (ci.properties != null) { // override MR properties and general tblproperties if applicable + if (ci.properties != null) { overrideTblProps(job, t.getParameters(), ci.properties); } setColumnTypes(job, sd.getCols()); @@ -137,6 +141,11 @@ public class CompactorMR { //to generate the target dir in the Map task, there is no easy way to pass it to OutputCommitter //to do the final move job.setBoolean("mapreduce.map.speculative", false); + + // Set appropriate Acid readers/writers based on the table properties. + AcidUtils.setAcidOperationalProperties(job, + AcidUtils.getAcidOperationalProperties(t.getParameters())); + return job; } @@ -501,12 +510,18 @@ public class CompactorMR { Map<Integer, BucketTracker> splitToBucketMap = new HashMap<Integer, BucketTracker>(); for (Path dir : dirsToSearch) { FileSystem fs = dir.getFileSystem(entries); + // When we have split-update and there are two kinds of delta directories- + // the delta_x_y/ directory one which has only insert events and + // the delete_delta_x_y/ directory which has only the delete events. + // The clever thing about this kind of splitting is that everything in the delta_x_y/ + // directory can be processed as base files. However, this is left out currently + // as an improvement for the future. - // If this is a base or delta directory, then we need to be looking for the bucket files. - // But if it's a legacy file then we need to add it directly. if (dir.getName().startsWith(AcidUtils.BASE_PREFIX) || - dir.getName().startsWith(AcidUtils.DELTA_PREFIX)) { + dir.getName().startsWith(AcidUtils.DELTA_PREFIX) || + dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX); + FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter); for(FileStatus f : files) { // For each file, figure out which bucket it is. @@ -519,6 +534,8 @@ public class CompactorMR { addFileToMap(matcher, dir, true, splitToBucketMap); } } + + List<InputSplit> splits = new ArrayList<InputSplit>(splitToBucketMap.size()); for (Map.Entry<Integer, BucketTracker> e : splitToBucketMap.entrySet()) { BucketTracker bt = e.getValue(); @@ -613,7 +630,8 @@ public class CompactorMR { implements Mapper<WritableComparable, CompactorInputSplit, NullWritable, NullWritable> { JobConf jobConf; - RecordWriter writer; + RecordWriter writer = null; + RecordWriter deleteEventWriter = null; @Override public void map(WritableComparable key, CompactorInputSplit split, @@ -636,10 +654,30 @@ public class CompactorMR { RecordIdentifier identifier = reader.createKey(); V value = reader.createValue(); getWriter(reporter, reader.getObjectInspector(), split.getBucket()); + + AcidUtils.AcidOperationalProperties acidOperationalProperties + = AcidUtils.getAcidOperationalProperties(jobConf); + + if (!isMajor && acidOperationalProperties.isSplitUpdate()) { + // When split-update is enabled for ACID, we initialize a separate deleteEventWriter + // that is used to write all the delete events (in case of minor compaction only). For major + // compaction, history is not required to be maintained hence the delete events are processed + // but not re-written separately. + getDeleteEventWriter(reporter, reader.getObjectInspector(), split.getBucket()); + } + while (reader.next(identifier, value)) { - if (isMajor && reader.isDelete(value)) continue; - writer.write(value); - reporter.progress(); + boolean sawDeleteRecord = reader.isDelete(value); + if (isMajor && sawDeleteRecord) continue; + if (sawDeleteRecord && deleteEventWriter != null) { + // When minor compacting, write delete events to a separate file when split-update is + // turned on. + deleteEventWriter.write(value); + reporter.progress(); + } else { + writer.write(value); + reporter.progress(); + } } } @@ -653,6 +691,9 @@ public class CompactorMR { if (writer != null) { writer.close(false); } + if (deleteEventWriter != null) { + deleteEventWriter.close(false); + } } private void getWriter(Reporter reporter, ObjectInspector inspector, @@ -679,6 +720,30 @@ public class CompactorMR { } } + private void getDeleteEventWriter(Reporter reporter, ObjectInspector inspector, + int bucket) throws IOException { + if (deleteEventWriter == null) { + AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf); + options.inspector(inspector) + .writingBase(false) + .writingDeleteDelta(true) // this is the option which will make it a delete writer + .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false)) + .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties()) + .reporter(reporter) + .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE)) + .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)) + .bucket(bucket) + .statementId(-1);//setting statementId == -1 makes compacted delta files use + //delta_xxxx_yyyy format + + // Instantiate the underlying output format + @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class + AcidOutputFormat<WritableComparable, V> aof = + instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME)); + + deleteEventWriter = aof.getRawRecordWriter(new Path(jobConf.get(TMP_LOCATION)), options); + } + } } static class StringableList extends ArrayList<Path> { http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index af192fb..08ca9d5 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -70,18 +70,19 @@ import org.junit.rules.TestName; * specifically the tests; the supporting code here is just a clone of TestTxnCommands */ public class TestTxnCommands2 { - private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + protected static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + TestTxnCommands2.class.getCanonicalName() + "-" + System.currentTimeMillis() ).getPath().replaceAll("\\\\", "/"); - private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; + protected static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; //bucket count for test tables; set it to 1 for easier debugging - private static int BUCKET_COUNT = 2; + protected static int BUCKET_COUNT = 2; @Rule public TestName testName = new TestName(); - private HiveConf hiveConf; - private Driver d; - private static enum Table { + + protected HiveConf hiveConf; + protected Driver d; + protected static enum Table { ACIDTBL("acidTbl"), ACIDTBLPART("acidTblPart"), NONACIDORCTBL("nonAcidOrcTbl"), @@ -99,6 +100,10 @@ public class TestTxnCommands2 { @Before public void setUp() throws Exception { + setUpWithTableProperties("'transactional'='true'"); + } + + protected void setUpWithTableProperties(String tableProperties) throws Exception { tearDown(); hiveConf = new HiveConf(this.getClass()); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); @@ -122,12 +127,13 @@ public class TestTxnCommands2 { SessionState.start(new SessionState(hiveConf)); d = new Driver(hiveConf); dropTables(); - runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES (" + tableProperties + ")"); + runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES (" + tableProperties + ")"); runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); runStatementOnDriver("create table " + Table.NONACIDPART + "(a int, b int) partitioned by (p string) stored as orc TBLPROPERTIES ('transactional'='false')"); } - private void dropTables() throws Exception { + + protected void dropTables() throws Exception { for(Table t : Table.values()) { runStatementOnDriver("drop table if exists " + t); } @@ -731,6 +737,8 @@ public class TestTxnCommands2 { Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); } + + @Test public void testValidTxnsBookkeeping() throws Exception { // 1. Run a query against a non-ACID table, and we shouldn't have txn logged in conf @@ -859,11 +867,15 @@ public class TestTxnCommands2 { */ @Test public void testInitiatorWithMultipleFailedCompactions() throws Exception { + testInitiatorWithMultipleFailedCompactionsForVariousTblProperties("'transactional'='true'"); + } + + void testInitiatorWithMultipleFailedCompactionsForVariousTblProperties(String tblProperties) throws Exception { String tblName = "hive12353"; runStatementOnDriver("drop table if exists " + tblName); runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed - " STORED AS ORC TBLPROPERTIES ('transactional'='true')"); + " STORED AS ORC TBLPROPERTIES ( " + tblProperties + " )"); hiveConf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 4); for(int i = 0; i < 5; i++) { //generate enough delta files so that Initiator can trigger auto compaction @@ -1074,11 +1086,15 @@ public class TestTxnCommands2 { */ @Test public void writeBetweenWorkerAndCleaner() throws Exception { + writeBetweenWorkerAndCleanerForVariousTblProperties("'transactional'='true'"); + } + + protected void writeBetweenWorkerAndCleanerForVariousTblProperties(String tblProperties) throws Exception { String tblName = "hive12352"; runStatementOnDriver("drop table if exists " + tblName); runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed - " STORED AS ORC TBLPROPERTIES ('transactional'='true')"); + " STORED AS ORC TBLPROPERTIES ( " + tblProperties + " )"); //create some data runStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')"); @@ -1125,7 +1141,6 @@ public class TestTxnCommands2 { Assert.assertEquals("", expected, runStatementOnDriver("select a,b from " + tblName + " order by a")); } - /** * Simulate the scenario when a heartbeat failed due to client errors such as no locks or no txns being found. * When a heartbeat fails, the query should be failed too. @@ -1215,17 +1230,78 @@ public class TestTxnCommands2 { hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData)); hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); - + runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); runWorker(hiveConf); runCleaner(hiveConf); runStatementOnDriver("select count(*) from " + Table.ACIDTBL); } + + @Test + public void testACIDwithSchemaEvolutionAndCompaction() throws Exception { + testACIDwithSchemaEvolutionForVariousTblProperties("'transactional'='true'"); + } + + protected void testACIDwithSchemaEvolutionForVariousTblProperties(String tblProperties) throws Exception { + String tblName = "acidWithSchemaEvol"; + int numBuckets = 1; + runStatementOnDriver("drop table if exists " + tblName); + runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " CLUSTERED BY(a) INTO " + numBuckets +" BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ( " + tblProperties + " )"); + + // create some data + runStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')"); + runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3"); + + // apply schema evolution by adding some columns + runStatementOnDriver("alter table " + tblName + " add columns(c int, d string)"); + + // insert some data in new schema + runStatementOnDriver("insert into " + tblName + " values(4, 'acid', 100, 'orc')," + + "(5, 'llap', 200, 'tez')"); + + // update old data with values for the new schema columns + runStatementOnDriver("update " + tblName + " set d = 'hive' where a <= 3"); + runStatementOnDriver("update " + tblName + " set c = 999 where a <= 3"); + + // read the entire data back and see if did everything right + List<String> rs = runStatementOnDriver("select * from " + tblName + " order by a"); + String[] expectedResult = { "1\tfoo\t999\thive", "2\tbar\t999\thive", "3\tblah\t999\thive", "4\tacid\t100\torc", "5\tllap\t200\ttez" }; + Assert.assertEquals(Arrays.asList(expectedResult), rs); + + // now compact and see if compaction still preserves the data correctness + runStatementOnDriver("alter table "+ tblName + " compact 'MAJOR'"); + runWorker(hiveConf); + runCleaner(hiveConf); // Cleaner would remove the obsolete files. + + // Verify that there is now only 1 new directory: base_xxxxxxx and the rest have have been cleaned. + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + tblName.toString().toLowerCase()), + FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + boolean sawNewBase = false; + for (int i = 0; i < status.length; i++) { + if (status[i].getPath().getName().matches("base_.*")) { + sawNewBase = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(numBuckets, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00000")); + } + } + Assert.assertTrue(sawNewBase); + + rs = runStatementOnDriver("select * from " + tblName + " order by a"); + Assert.assertEquals(Arrays.asList(expectedResult), rs); + } + + /** * takes raw data and turns it into a string as if from Driver.getResults() * sorts rows in dictionary order */ - private List<String> stringifyValues(int[][] rowsIn) { + protected List<String> stringifyValues(int[][] rowsIn) { assert rowsIn.length > 0; int[][] rows = rowsIn.clone(); Arrays.sort(rows, new RowComp()); @@ -1275,7 +1351,7 @@ public class TestTxnCommands2 { return sb.toString(); } - private List<String> runStatementOnDriver(String stmt) throws Exception { + protected List<String> runStatementOnDriver(String stmt) throws Exception { CommandProcessorResponse cpr = d.run(stmt); if(cpr.getResponseCode() != 0) { throw new RuntimeException(stmt + " failed: " + cpr);
