HIVE-11030 - Enhance storage layer to create one delta file per write (Eugene Koifman, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c30ab468 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c30ab468 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c30ab468 Branch: refs/heads/branch-1 Commit: c30ab4686cbfe73c3cf4552fa7e07c8ded3b4b17 Parents: 16d1b74 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Mon Jul 13 09:31:17 2015 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Mon Jul 13 09:31:17 2015 -0700 ---------------------------------------------------------------------- .../streaming/AbstractRecordWriter.java | 4 +- .../java/org/apache/hadoop/hive/ql/Driver.java | 1 + .../hadoop/hive/ql/io/AcidInputFormat.java | 60 +++++++- .../hadoop/hive/ql/io/AcidOutputFormat.java | 49 +++++- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 152 +++++++++++++++---- .../hadoop/hive/ql/io/HiveFileFormatUtils.java | 19 +-- .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 20 +-- .../hadoop/hive/ql/io/orc/OrcNewSplit.java | 13 +- .../hive/ql/io/orc/OrcRawRecordMerger.java | 66 ++++++-- .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 58 +++++++ .../apache/hadoop/hive/ql/io/orc/OrcSplit.java | 16 +- .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 20 ++- .../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 4 + .../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 3 + .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 3 +- .../hadoop/hive/ql/plan/FileSinkDesc.java | 27 +++- .../hive/ql/txn/compactor/CompactorMR.java | 4 +- .../hive/ql/exec/TestFileSinkOperator.java | 3 +- .../apache/hadoop/hive/ql/io/TestAcidUtils.java | 73 ++++++++- .../hive/ql/io/orc/TestInputOutputFormat.java | 13 +- .../hive/ql/io/orc/TestOrcRawRecordMerger.java | 57 ++++--- .../hive/ql/io/orc/TestOrcRecordUpdater.java | 6 +- .../hive/ql/txn/compactor/CompactorTest.java | 20 ++- .../hive/ql/txn/compactor/TestCleaner.java | 8 +- .../hive/ql/txn/compactor/TestCleaner2.java | 14 ++ .../hive/ql/txn/compactor/TestInitiator.java | 4 + .../hive/ql/txn/compactor/TestWorker.java | 49 +++--- .../hive/ql/txn/compactor/TestWorker2.java | 16 ++ 28 files changed, 642 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index ed46bca..c959222 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -143,7 +143,9 @@ abstract class AbstractRecordWriter implements RecordWriter { .inspector(getSerde().getObjectInspector()) .bucket(bucketId) .minimumTransactionId(minTxnId) - .maximumTransactionId(maxTxnID)); + .maximumTransactionId(maxTxnID) + .statementId(-1) + .finalDestination(partitionPath)); } catch (SerDeException e) { throw new SerializationError("Failed to get object inspector from Serde " + getSerde().getClass().getName(), e); http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index e04165b..d161503 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -986,6 +986,7 @@ public class Driver implements CommandProcessor { if (acidSinks != null) { for (FileSinkDesc desc : acidSinks) { desc.setTransactionId(txnId); + desc.setStatementId(txnMgr.getStatementId()); } } http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java index e1d2395..24506b7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java @@ -22,13 +22,19 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; /** * The interface required for input formats that what to support ACID @@ -62,7 +68,7 @@ import java.io.IOException; * <li>New format - * <pre> * $partition/base_$tid/$bucket - * delta_$tid_$tid/$bucket + * delta_$tid_$tid_$stid/$bucket * </pre></li> * </ul> * <p> @@ -71,6 +77,8 @@ import java.io.IOException; * stored sorted by the original transaction id (ascending), bucket (ascending), * row id (ascending), and current transaction id (descending). Thus the files * can be merged by advancing through the files in parallel. + * The stid is unique id (within the transaction) of the statement that created + * this delta file. * <p> * The base files include all transactions from the beginning of time * (transaction id 0) to the transaction in the directory name. Delta @@ -91,7 +99,7 @@ import java.io.IOException; * For row-at-a-time processing, KEY can conveniently pass RowId into the operator * pipeline. For vectorized execution the KEY could perhaps represent a range in the batch. * Since {@link org.apache.hadoop.hive.ql.io.orc.OrcInputFormat} is declared to return - * {@code NullWritable} key, {@link org.apache.hadoop.hive.ql.io.AcidRecordReader} is defined + * {@code NullWritable} key, {@link org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader} is defined * to provide access to the RowId. Other implementations of AcidInputFormat can use either * mechanism. * </p> @@ -101,6 +109,54 @@ import java.io.IOException; public interface AcidInputFormat<KEY extends WritableComparable, VALUE> extends InputFormat<KEY, VALUE>, InputFormatChecker { + static final class DeltaMetaData implements Writable { + private long minTxnId; + private long maxTxnId; + private List<Integer> stmtIds; + + public DeltaMetaData() { + this(0,0,null); + } + DeltaMetaData(long minTxnId, long maxTxnId, List<Integer> stmtIds) { + this.minTxnId = minTxnId; + this.maxTxnId = maxTxnId; + this.stmtIds = stmtIds; + } + long getMinTxnId() { + return minTxnId; + } + long getMaxTxnId() { + return maxTxnId; + } + List<Integer> getStmtIds() { + return stmtIds; + } + @Override + public void write(DataOutput out) throws IOException { + out.writeLong(minTxnId); + out.writeLong(maxTxnId); + out.writeInt(stmtIds.size()); + if(stmtIds == null) { + return; + } + for(Integer id : stmtIds) { + out.writeInt(id); + } + } + @Override + public void readFields(DataInput in) throws IOException { + minTxnId = in.readLong(); + maxTxnId = in.readLong(); + int numStatements = in.readInt(); + if(numStatements <= 0) { + return; + } + stmtIds = new ArrayList<>(); + for(int i = 0; i < numStatements; i++) { + stmtIds.add(in.readInt()); + } + } + } /** * Options for controlling the record readers. */ http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java index 0d537e1..dd90a95 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java @@ -39,7 +39,7 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO /** * Options to control how the files are written */ - public static class Options { + public static class Options implements Cloneable { private final Configuration configuration; private FileSystem fs; private ObjectInspector inspector; @@ -53,7 +53,9 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO private PrintStream dummyStream = null; private boolean oldStyle = false; private int recIdCol = -1; // Column the record identifier is in, -1 indicates no record id - + //unique within a transaction + private int statementId = 0; + private Path finalDestination; /** * Create the options object. * @param conf Use the given configuration @@ -63,6 +65,18 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO } /** + * shallow clone + */ + @Override + public Options clone() { + try { + return (Options)super.clone(); + } + catch(CloneNotSupportedException ex) { + throw new RuntimeException("clone() not properly implemented: " + ex.getMessage(), ex); + } + } + /** * Use the given ObjectInspector for each record written. * @param inspector the inspector to use. * @return this @@ -185,6 +199,31 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO return this; } + /** + * @since 1.3.0 + * This can be set to -1 to make the system generate old style (delta_xxxx_yyyy) file names. + * This is primarily needed for testing to make sure 1.3 code can still read files created + * by older code. Also used by Comactor. + */ + public Options statementId(int id) { + if(id >= AcidUtils.MAX_STATEMENTS_PER_TXN) { + throw new RuntimeException("Too many statements for transactionId: " + maximumTransactionId); + } + if(id < -1) { + throw new IllegalArgumentException("Illegal statementId value: " + id); + } + this.statementId = id; + return this; + } + /** + * @param p where the data for this operation will eventually end up; + * basically table or partition directory in FS + */ + public Options finalDestination(Path p) { + this.finalDestination = p; + return this; + } + public Configuration getConfiguration() { return configuration; } @@ -236,6 +275,12 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO boolean getOldStyle() { return oldStyle; } + public int getStatementId() { + return statementId; + } + public Path getFinalDestination() { + return finalDestination; + } } /** http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 2214733..c7e0780 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -67,6 +67,15 @@ public class AcidUtils { }; public static final String BUCKET_DIGITS = "%05d"; public static final String DELTA_DIGITS = "%07d"; + /** + * 10K statements per tx. Probably overkill ... since that many delta files + * would not be good for performance + */ + public static final String STATEMENT_DIGITS = "%04d"; + /** + * This must be in sync with {@link #STATEMENT_DIGITS} + */ + public static final int MAX_STATEMENTS_PER_TXN = 10000; public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$"); public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{5}"); public static final PathFilter originalBucketFilter = new PathFilter() { @@ -79,7 +88,7 @@ public class AcidUtils { private AcidUtils() { // NOT USED } - private static final Log LOG = LogFactory.getLog(AcidUtils.class.getName()); + private static final Log LOG = LogFactory.getLog(AcidUtils.class); private static final Pattern ORIGINAL_PATTERN = Pattern.compile("[0-9]+_[0-9]+"); @@ -104,12 +113,23 @@ public class AcidUtils { BUCKET_PREFIX + String.format(BUCKET_DIGITS, bucket)); } - private static String deltaSubdir(long min, long max) { + /** + * This is format of delta dir name prior to Hive 1.3.x + */ + public static String deltaSubdir(long min, long max) { return DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" + String.format(DELTA_DIGITS, max); } /** + * Each write statement in a transaction creates its own delta dir. + * @since 1.3.x + */ + public static String deltaSubdir(long min, long max, int statementId) { + return deltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId); + } + + /** * Create a filename for a bucket file. * @param directory the partition directory * @param options the options for writing the bucket @@ -124,9 +144,15 @@ public class AcidUtils { } else if (options.isWritingBase()) { subdir = BASE_PREFIX + String.format(DELTA_DIGITS, options.getMaximumTransactionId()); + } else if(options.getStatementId() == -1) { + //when minor compaction runs, we collapse per statement delta files inside a single + //transaction so we no longer need a statementId in the file name + subdir = deltaSubdir(options.getMinimumTransactionId(), + options.getMaximumTransactionId()); } else { subdir = deltaSubdir(options.getMinimumTransactionId(), - options.getMaximumTransactionId()); + options.getMaximumTransactionId(), + options.getStatementId()); } return createBucketFile(new Path(directory, subdir), options.getBucket()); } @@ -214,14 +240,24 @@ public class AcidUtils { } public static class ParsedDelta implements Comparable<ParsedDelta> { - final long minTransaction; - final long maxTransaction; - final FileStatus path; + private final long minTransaction; + private final long maxTransaction; + private final FileStatus path; + //-1 is for internal (getAcidState()) purposes and means the delta dir + //had no statement ID + private final int statementId; + /** + * for pre 1.3.x delta files + */ ParsedDelta(long min, long max, FileStatus path) { + this(min, max, path, -1); + } + ParsedDelta(long min, long max, FileStatus path, int statementId) { this.minTransaction = min; this.maxTransaction = max; this.path = path; + this.statementId = statementId; } public long getMinTransaction() { @@ -236,6 +272,16 @@ public class AcidUtils { return path.getPath(); } + public int getStatementId() { + return statementId == -1 ? 0 : statementId; + } + + /** + * Compactions (Major/Minor) merge deltas/bases but delete of old files + * happens in a different process; thus it's possible to have bases/deltas with + * overlapping txnId boundaries. The sort order helps figure out the "best" set of files + * to use to get data. + */ @Override public int compareTo(ParsedDelta parsedDelta) { if (minTransaction != parsedDelta.minTransaction) { @@ -250,7 +296,22 @@ public class AcidUtils { } else { return -1; } - } else { + } + else if(statementId != parsedDelta.statementId) { + /** + * We want deltas after minor compaction (w/o statementId) to sort + * earlier so that getAcidState() considers compacted files (into larger ones) obsolete + * Before compaction, include deltas with all statementIds for a given txnId + * in a {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory} + */ + if(statementId < parsedDelta.statementId) { + return -1; + } + else { + return 1; + } + } + else { return path.compareTo(parsedDelta.path); } } @@ -271,46 +332,72 @@ public class AcidUtils { /** * Convert the list of deltas into an equivalent list of begin/end - * transaction id pairs. + * transaction id pairs. Assumes {@code deltas} is sorted. * @param deltas * @return the list of transaction ids to serialize */ - public static List<Long> serializeDeltas(List<ParsedDelta> deltas) { - List<Long> result = new ArrayList<Long>(deltas.size() * 2); - for(ParsedDelta delta: deltas) { - result.add(delta.minTransaction); - result.add(delta.maxTransaction); + public static List<AcidInputFormat.DeltaMetaData> serializeDeltas(List<ParsedDelta> deltas) { + List<AcidInputFormat.DeltaMetaData> result = new ArrayList<>(deltas.size()); + AcidInputFormat.DeltaMetaData last = null; + for(ParsedDelta parsedDelta : deltas) { + if(last != null && last.getMinTxnId() == parsedDelta.getMinTransaction() && last.getMaxTxnId() == parsedDelta.getMaxTransaction()) { + last.getStmtIds().add(parsedDelta.getStatementId()); + continue; + } + last = new AcidInputFormat.DeltaMetaData(parsedDelta.getMinTransaction(), parsedDelta.getMaxTransaction(), new ArrayList<Integer>()); + result.add(last); + if(parsedDelta.statementId >= 0) { + last.getStmtIds().add(parsedDelta.getStatementId()); + } } return result; } /** * Convert the list of begin/end transaction id pairs to a list of delta - * directories. + * directories. Note that there may be multiple delta files for the exact same txn range starting + * with 1.3.x; + * see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)} * @param root the root directory * @param deltas list of begin/end transaction id pairs * @return the list of delta paths */ - public static Path[] deserializeDeltas(Path root, List<Long> deltas) { - int deltaSize = deltas.size() / 2; - Path[] result = new Path[deltaSize]; - for(int i = 0; i < deltaSize; ++i) { - result[i] = new Path(root, deltaSubdir(deltas.get(i * 2), - deltas.get(i * 2 + 1))); + public static Path[] deserializeDeltas(Path root, final List<AcidInputFormat.DeltaMetaData> deltas) throws IOException { + List<Path> results = new ArrayList<Path>(deltas.size()); + for(AcidInputFormat.DeltaMetaData dmd : deltas) { + if(dmd.getStmtIds().isEmpty()) { + results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId()))); + continue; + } + for(Integer stmtId : dmd.getStmtIds()) { + results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId(), stmtId))); + } } - return result; + return results.toArray(new Path[results.size()]); } - static ParsedDelta parseDelta(FileStatus path) { - String filename = path.getPath().getName(); + private static ParsedDelta parseDelta(FileStatus path) { + ParsedDelta p = parsedDelta(path.getPath()); + return new ParsedDelta(p.getMinTransaction(), + p.getMaxTransaction(), path, p.statementId); + } + public static ParsedDelta parsedDelta(Path deltaDir) { + String filename = deltaDir.getName(); if (filename.startsWith(DELTA_PREFIX)) { String rest = filename.substring(DELTA_PREFIX.length()); int split = rest.indexOf('_'); + int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId long min = Long.parseLong(rest.substring(0, split)); - long max = Long.parseLong(rest.substring(split + 1)); - return new ParsedDelta(min, max, path); + long max = split2 == -1 ? + Long.parseLong(rest.substring(split + 1)) : + Long.parseLong(rest.substring(split + 1, split2)); + if(split2 == -1) { + return new ParsedDelta(min, max, null); + } + int statementId = Integer.parseInt(rest.substring(split2 + 1)); + return new ParsedDelta(min, max, null, statementId); } - throw new IllegalArgumentException(path + " does not start with " + + throw new IllegalArgumentException(deltaDir + " does not start with " + DELTA_PREFIX); } @@ -407,15 +494,24 @@ public class AcidUtils { Collections.sort(working); long current = bestBaseTxn; + int lastStmtId = -1; for(ParsedDelta next: working) { if (next.maxTransaction > current) { // are any of the new transactions ones that we care about? if (txnList.isTxnRangeValid(current+1, next.maxTransaction) != - ValidTxnList.RangeResponse.NONE) { + ValidTxnList.RangeResponse.NONE) { deltas.add(next); current = next.maxTransaction; + lastStmtId = next.statementId; } - } else { + } + else if(next.maxTransaction == current && lastStmtId >= 0) { + //make sure to get all deltas within a single transaction; multi-statement txn + //generate multiple delta files with the same txnId range + //of course, if maxTransaction has already been minor compacted, all per statement deltas are obsolete + deltas.add(next); + } + else { obsolete.add(next.path); } } http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java index 7ad5aa0..50ba740 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java @@ -297,31 +297,32 @@ public final class HiveFileFormatUtils { // TODO not 100% sure about this. This call doesn't set the compression type in the conf // file the way getHiveRecordWriter does, as ORC appears to read the value for itself. Not // sure if this is correct or not. - return getRecordUpdater(jc, acidOutputFormat, conf.getCompressed(), conf.getTransactionId(), - bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum); + return getRecordUpdater(jc, acidOutputFormat, + bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum, conf); } private static RecordUpdater getRecordUpdater(JobConf jc, AcidOutputFormat<?, ?> acidOutputFormat, - boolean isCompressed, - long txnId, int bucket, ObjectInspector inspector, Properties tableProp, Path outPath, Reporter reporter, - int rowIdColNum) throws IOException { + int rowIdColNum, + FileSinkDesc conf) throws IOException { return acidOutputFormat.getRecordUpdater(outPath, new AcidOutputFormat.Options(jc) - .isCompressed(isCompressed) + .isCompressed(conf.getCompressed()) .tableProperties(tableProp) .reporter(reporter) .writingBase(false) - .minimumTransactionId(txnId) - .maximumTransactionId(txnId) + .minimumTransactionId(conf.getTransactionId()) + .maximumTransactionId(conf.getTransactionId()) .bucket(bucket) .inspector(inspector) - .recordIdColumn(rowIdColNum)); + .recordIdColumn(rowIdColNum) + .statementId(conf.getStatementId()) + .finalDestination(conf.getDestPath())); } public static PartitionDesc getPartitionDescFromPathRecursively( http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 7346bc4..6fe0044 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -443,13 +443,13 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, private final FileStatus file; private final FileInfo fileInfo; private final boolean isOriginal; - private final List<Long> deltas; + private final List<DeltaMetaData> deltas; private final boolean hasBase; SplitInfo(Context context, FileSystem fs, FileStatus file, FileInfo fileInfo, boolean isOriginal, - List<Long> deltas, + List<DeltaMetaData> deltas, boolean hasBase, Path dir, boolean[] covered) throws IOException { super(dir, context.numBuckets, deltas, covered); this.context = context; @@ -471,12 +471,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, FileSystem fs; List<FileStatus> files; boolean isOriginal; - List<Long> deltas; + List<DeltaMetaData> deltas; Path dir; boolean[] covered; public ETLSplitStrategy(Context context, FileSystem fs, Path dir, List<FileStatus> children, - boolean isOriginal, List<Long> deltas, boolean[] covered) { + boolean isOriginal, List<DeltaMetaData> deltas, boolean[] covered) { this.context = context; this.dir = dir; this.fs = fs; @@ -547,14 +547,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, static final class BISplitStrategy extends ACIDSplitStrategy { List<FileStatus> fileStatuses; boolean isOriginal; - List<Long> deltas; + List<DeltaMetaData> deltas; FileSystem fs; Context context; Path dir; public BISplitStrategy(Context context, FileSystem fs, Path dir, List<FileStatus> fileStatuses, boolean isOriginal, - List<Long> deltas, boolean[] covered) { + List<DeltaMetaData> deltas, boolean[] covered) { super(dir, context.numBuckets, deltas, covered); this.context = context; this.fileStatuses = fileStatuses; @@ -591,11 +591,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, */ static class ACIDSplitStrategy implements SplitStrategy<OrcSplit> { Path dir; - List<Long> deltas; + List<DeltaMetaData> deltas; boolean[] covered; int numBuckets; - public ACIDSplitStrategy(Path dir, int numBuckets, List<Long> deltas, boolean[] covered) { + public ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered) { this.dir = dir; this.numBuckets = numBuckets; this.deltas = deltas; @@ -644,7 +644,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, final SplitStrategy splitStrategy; AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf, context.transactionList); - List<Long> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories()); + List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories()); Path base = dirInfo.getBaseDirectory(); List<FileStatus> original = dirInfo.getOriginalFiles(); boolean[] covered = new boolean[context.numBuckets]; @@ -722,7 +722,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, private Metadata metadata; private List<OrcProto.Type> types; private final boolean isOriginal; - private final List<Long> deltas; + private final List<DeltaMetaData> deltas; private final boolean hasBase; private OrcFile.WriterVersion writerVersion; private long projColsUncompressedSize; http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java index da23544..b58c880 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.lib.input.FileSplit; @@ -37,7 +38,7 @@ public class OrcNewSplit extends FileSplit { private boolean hasFooter; private boolean isOriginal; private boolean hasBase; - private final List<Long> deltas = new ArrayList<Long>(); + private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>(); private OrcFile.WriterVersion writerVersion; protected OrcNewSplit(){ @@ -67,8 +68,8 @@ public class OrcNewSplit extends FileSplit { (hasFooter ? OrcSplit.FOOTER_FLAG : 0); out.writeByte(flags); out.writeInt(deltas.size()); - for(Long delta: deltas) { - out.writeLong(delta); + for(AcidInputFormat.DeltaMetaData delta: deltas) { + delta.write(out); } if (hasFooter) { // serialize FileMetaInfo fields @@ -101,7 +102,9 @@ public class OrcNewSplit extends FileSplit { deltas.clear(); int numDeltas = in.readInt(); for(int i=0; i < numDeltas; i++) { - deltas.add(in.readLong()); + AcidInputFormat.DeltaMetaData dmd = new AcidInputFormat.DeltaMetaData(); + dmd.readFields(in); + deltas.add(dmd); } if (hasFooter) { // deserialize FileMetaInfo fields @@ -137,7 +140,7 @@ public class OrcNewSplit extends FileSplit { return hasBase; } - public List<Long> getDeltas() { + public List<AcidInputFormat.DeltaMetaData> getDeltas() { return deltas; } } http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 728118a..2f11611 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -72,41 +72,55 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ /** * A RecordIdentifier extended with the current transaction id. This is the * key of our merge sort with the originalTransaction, bucket, and rowId - * ascending and the currentTransaction descending. This means that if the + * ascending and the currentTransaction, statementId descending. This means that if the * reader is collapsing events to just the last update, just the first * instance of each record is required. */ final static class ReaderKey extends RecordIdentifier{ private long currentTransactionId; + private int statementId;//sort on this descending, like currentTransactionId public ReaderKey() { - this(-1, -1, -1, -1); + this(-1, -1, -1, -1, 0); } public ReaderKey(long originalTransaction, int bucket, long rowId, long currentTransactionId) { + this(originalTransaction, bucket, rowId, currentTransactionId, 0); + } + /** + * @param statementId - set this to 0 if N/A + */ + public ReaderKey(long originalTransaction, int bucket, long rowId, + long currentTransactionId, int statementId) { super(originalTransaction, bucket, rowId); this.currentTransactionId = currentTransactionId; + this.statementId = statementId; } @Override public void set(RecordIdentifier other) { super.set(other); currentTransactionId = ((ReaderKey) other).currentTransactionId; + statementId = ((ReaderKey) other).statementId; } public void setValues(long originalTransactionId, int bucket, long rowId, - long currentTransactionId) { + long currentTransactionId, + int statementId) { setValues(originalTransactionId, bucket, rowId); this.currentTransactionId = currentTransactionId; + this.statementId = statementId; } @Override public boolean equals(Object other) { return super.equals(other) && - currentTransactionId == ((ReaderKey) other).currentTransactionId; + currentTransactionId == ((ReaderKey) other).currentTransactionId + && statementId == ((ReaderKey) other).statementId//consistent with compareTo() + ; } @Override @@ -118,6 +132,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ if (currentTransactionId != oth.currentTransactionId) { return currentTransactionId < oth.currentTransactionId ? +1 : -1; } + if(statementId != oth.statementId) { + return statementId < oth.statementId ? +1 : -1; + } } else { return -1; } @@ -125,6 +142,13 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ return sup; } + /** + * This means 1 txn modified the same row more than once + */ + private boolean isSameRow(ReaderKey other) { + return compareRow(other) == 0 && currentTransactionId == other.currentTransactionId; + } + public long getCurrentTransactionId() { return currentTransactionId; } @@ -142,7 +166,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ public String toString() { return "{originalTxn: " + getTransactionId() + ", bucket: " + getBucketId() + ", row: " + getRowId() + ", currentTxn: " + - currentTransactionId + "}"; + currentTransactionId + ", statementId: "+ statementId + "}"; } } @@ -159,6 +183,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ final ReaderKey key; final RecordIdentifier maxKey; final int bucket; + private final int statementId; /** * Create a reader that reads from the first key larger than minKey to any @@ -170,17 +195,19 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ * @param maxKey only return keys less than or equal to maxKey if it is * non-null * @param options options to provide to read the rows. + * @param statementId id of SQL statement within a transaction * @throws IOException */ ReaderPair(ReaderKey key, Reader reader, int bucket, RecordIdentifier minKey, RecordIdentifier maxKey, - ReaderImpl.Options options) throws IOException { + ReaderImpl.Options options, int statementId) throws IOException { this.reader = reader; this.key = key; this.maxKey = maxKey; this.bucket = bucket; // TODO use stripe statistics to jump over stripes recordReader = reader.rowsOptions(options); + this.statementId = statementId; // advance the reader until we reach the minimum key do { next(nextRecord); @@ -195,7 +222,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ key.setValues(OrcRecordUpdater.getOriginalTransaction(nextRecord), OrcRecordUpdater.getBucket(nextRecord), OrcRecordUpdater.getRowId(nextRecord), - OrcRecordUpdater.getCurrentTransaction(nextRecord)); + OrcRecordUpdater.getCurrentTransaction(nextRecord), + statementId); // if this record is larger than maxKey, we need to stop if (maxKey != null && key.compareRow(maxKey) > 0) { @@ -223,7 +251,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ OriginalReaderPair(ReaderKey key, Reader reader, int bucket, RecordIdentifier minKey, RecordIdentifier maxKey, Reader.Options options) throws IOException { - super(key, reader, bucket, minKey, maxKey, options); + super(key, reader, bucket, minKey, maxKey, options, 0); } @Override @@ -263,7 +291,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ nextRecord.setFieldValue(OrcRecordUpdater.ROW, recordReader.next(OrcRecordUpdater.getRow(next))); } - key.setValues(0L, bucket, nextRowId, 0L); + key.setValues(0L, bucket, nextRowId, 0L, 0); if (maxKey != null && key.compareRow(maxKey) > 0) { if (LOG.isDebugEnabled()) { LOG.debug("key " + key + " > maxkey " + maxKey); @@ -415,7 +443,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ this.offset = options.getOffset(); this.length = options.getLength(); this.validTxnList = validTxnList; - // modify the optins to reflect the event instead of the base row + // modify the options to reflect the event instead of the base row Reader.Options eventOptions = createEventOptions(options); if (reader == null) { baseReader = null; @@ -438,7 +466,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ options); } else { pair = new ReaderPair(key, reader, bucket, minKey, maxKey, - eventOptions); + eventOptions, 0); } // if there is at least one record, put it in the map @@ -458,13 +486,14 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ for(Path delta: deltaDirectory) { ReaderKey key = new ReaderKey(); Path deltaFile = AcidUtils.createBucketFile(delta, bucket); + AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta); FileSystem fs = deltaFile.getFileSystem(conf); long length = getLastFlushLength(fs, deltaFile); if (length != -1 && fs.exists(deltaFile)) { Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf).maxLength(length)); ReaderPair deltaPair = new ReaderPair(key, deltaReader, bucket, minKey, - maxKey, eventOptions); + maxKey, eventOptions, deltaDir.getStatementId()); if (deltaPair.nextRecord != null) { readers.put(key, deltaPair); } @@ -580,9 +609,18 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ continue; } + /*for multi-statement txns, you may have multiple events for the same + * row in the same (current) transaction. We want to collapse these to just the last one + * regardless whether we are minor compacting. Consider INSERT/UPDATE/UPDATE of the + * same row in the same txn. There is no benefit passing along anything except the last + * event. If we did want to pass it along, we'd have to include statementId in the row + * returned so that compaction could write it out or make minor minor compaction understand + * how to write out delta files in delta_xxx_yyy_stid format. There doesn't seem to be any + * value in this.*/ + boolean isSameRow = prevKey.isSameRow((ReaderKey)recordIdentifier); // if we are collapsing, figure out if this is a new row - if (collapse) { - keysSame = prevKey.compareRow(recordIdentifier) == 0; + if (collapse || isSameRow) { + keysSame = (collapse && prevKey.compareRow(recordIdentifier) == 0) || (isSameRow); if (!keysSame) { prevKey.set(recordIdentifier); } http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index b576496..e4651b8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -89,6 +89,7 @@ public class OrcRecordUpdater implements RecordUpdater { private final IntWritable bucket = new IntWritable(); private final LongWritable rowId = new LongWritable(); private long insertedRows = 0; + private long rowIdOffset = 0; // This records how many rows have been inserted or deleted. It is separate from insertedRows // because that is monotonically increasing to give new unique row ids. private long rowCountDelta = 0; @@ -263,6 +264,41 @@ public class OrcRecordUpdater implements RecordUpdater { item.setFieldValue(ROW_ID, rowId); } + /** + * To handle multiple INSERT... statements in a single transaction, we want to make sure + * to generate unique {@code rowId} for all inserted rows of the transaction. + * @return largest rowId created by previous statements (maybe 0) + * @throws IOException + */ + private long findRowIdOffsetForInsert() throws IOException { + /* + * 1. need to know bucket we are writing to + * 2. need to know which delta dir it's in + * Then, + * 1. find the same bucket file in previous delta dir for this txn + * 2. read the footer and get AcidStats which has insert count + * 2.1 if AcidStats.inserts>0 done + * else go to previous delta file + * For example, consider insert/update/insert case...*/ + if(options.getStatementId() <= 0) { + return 0;//there is only 1 statement in this transaction (so far) + } + for(int pastStmt = options.getStatementId() - 1; pastStmt >= 0; pastStmt--) { + Path matchingBucket = AcidUtils.createFilename(options.getFinalDestination(), options.clone().statementId(pastStmt)); + if(!fs.exists(matchingBucket)) { + continue; + } + Reader reader = OrcFile.createReader(matchingBucket, OrcFile.readerOptions(options.getConfiguration())); + //no close() on Reader?! + AcidStats acidStats = parseAcidStats(reader); + if(acidStats.inserts > 0) { + return acidStats.inserts; + } + } + //if we got here, we looked at all delta files in this txn, prior to current statement and didn't + //find any inserts... + return 0; + } // Find the record identifier column (if there) and return a possibly new ObjectInspector that // will strain out the record id for the underlying writer. private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) { @@ -304,6 +340,9 @@ public class OrcRecordUpdater implements RecordUpdater { recIdInspector.getStructFieldData(rowIdValue, originalTxnField)); rowId = rowIdInspector.get(recIdInspector.getStructFieldData(rowIdValue, rowIdField)); } + else if(operation == INSERT_OPERATION) { + rowId += rowIdOffset; + } this.rowId.set(rowId); this.originalTransaction.set(originalTransaction); item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row)); @@ -315,6 +354,9 @@ public class OrcRecordUpdater implements RecordUpdater { public void insert(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; + //this method is almost no-op in hcatalog.streaming case since statementId == 0 is + //always true in that case + rowIdOffset = findRowIdOffsetForInsert(); } addEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); rowCountDelta++; @@ -407,6 +449,22 @@ public class OrcRecordUpdater implements RecordUpdater { } return result; } + /** + * {@link KeyIndexBuilder} creates these + */ + static AcidStats parseAcidStats(Reader reader) { + String statsSerialized; + try { + ByteBuffer val = + reader.getMetadataValue(OrcRecordUpdater.ACID_STATS) + .duplicate(); + statsSerialized = utf8Decoder.decode(val).toString(); + } catch (CharacterCodingException e) { + throw new IllegalArgumentException("Bad string encoding for " + + OrcRecordUpdater.ACID_STATS, e); + } + return new AcidStats(statsSerialized); + } static class KeyIndexBuilder implements OrcFile.WriterCallback { StringBuilder lastKey = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index 0c7dd40..8cf4cc0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -26,6 +26,8 @@ import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.FileSplit; @@ -41,7 +43,7 @@ public class OrcSplit extends FileSplit { private boolean hasFooter; private boolean isOriginal; private boolean hasBase; - private final List<Long> deltas = new ArrayList<Long>(); + private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>(); private OrcFile.WriterVersion writerVersion; private long projColsUncompressedSize; @@ -58,7 +60,7 @@ public class OrcSplit extends FileSplit { public OrcSplit(Path path, long offset, long length, String[] hosts, ReaderImpl.FileMetaInfo fileMetaInfo, boolean isOriginal, boolean hasBase, - List<Long> deltas, long projectedDataSize) { + List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize) { super(path, offset, length, hosts); this.fileMetaInfo = fileMetaInfo; hasFooter = this.fileMetaInfo != null; @@ -78,8 +80,8 @@ public class OrcSplit extends FileSplit { (hasFooter ? FOOTER_FLAG : 0); out.writeByte(flags); out.writeInt(deltas.size()); - for(Long delta: deltas) { - out.writeLong(delta); + for(AcidInputFormat.DeltaMetaData delta: deltas) { + delta.write(out); } if (hasFooter) { // serialize FileMetaInfo fields @@ -112,7 +114,9 @@ public class OrcSplit extends FileSplit { deltas.clear(); int numDeltas = in.readInt(); for(int i=0; i < numDeltas; i++) { - deltas.add(in.readLong()); + AcidInputFormat.DeltaMetaData dmd = new AcidInputFormat.DeltaMetaData(); + dmd.readFields(in); + deltas.add(dmd); } if (hasFooter) { // deserialize FileMetaInfo fields @@ -148,7 +152,7 @@ public class OrcSplit extends FileSplit { return hasBase; } - public List<Long> getDeltas() { + public List<AcidInputFormat.DeltaMetaData> getDeltas() { return deltas; } http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index f8fff1a..445f606 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -52,6 +52,14 @@ public class DbTxnManager extends HiveTxnManagerImpl { private DbLockManager lockMgr = null; private IMetaStoreClient client = null; private long txnId = 0; + /** + * assigns a unique monotonically increasing ID to each statement + * which is part of an open transaction. This is used by storage + * layer (see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)}) + * to keep apart multiple writes of the same data within the same transaction + * Also see {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options} + */ + private int statementId = -1; DbTxnManager() { } @@ -69,6 +77,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { init(); try { txnId = client.openTxn(user); + statementId = 0; LOG.debug("Opened txn " + txnId); return txnId; } catch (TException e) { @@ -222,7 +231,10 @@ public class DbTxnManager extends HiveTxnManagerImpl { return null; } - List<HiveLock> locks = new ArrayList<HiveLock>(1); + List<HiveLock> locks = new ArrayList<HiveLock>(1); + if(txnId > 0) { + statementId++; + } LockState lockState = lockMgr.lock(rqstBuilder.build(), plan.getQueryId(), isBlocking, locks); ctx.setHiveLocks(locks); return lockState; @@ -249,6 +261,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { e); } finally { txnId = 0; + statementId = -1; } } @@ -270,6 +283,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { e); } finally { txnId = 0; + statementId = -1; } } @@ -361,5 +375,9 @@ public class DbTxnManager extends HiveTxnManagerImpl { } } } + @Override + public int getStatementId() { + return statementId; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index 21ab8ee..1906982 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -54,6 +54,10 @@ class DummyTxnManager extends HiveTxnManagerImpl { } @Override + public int getStatementId() { + return 0; + } + @Override public HiveLockManager getLockManager() throws LockException { if (lockMgr == null) { boolean supportConcurrency = http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index 2dd0c7d..6c3dc33 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -127,4 +127,7 @@ public interface HiveTxnManager { * @return true if this transaction manager does ACID */ boolean supportsAcid(); + + int getStatementId(); + } http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 1ce98b8..5719cf4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6605,7 +6605,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { rsCtx.getNumFiles(), rsCtx.getTotalFiles(), rsCtx.getPartnCols(), - dpCtx); + dpCtx, + dest_path); // If this is an insert, update, or delete on an ACID table then mark that so the // FileSinkOperator knows how to properly write to it. http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index bb6cee5..f73b502 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -92,16 +92,21 @@ public class FileSinkDesc extends AbstractOperatorDesc { // Record what type of write this is. Default is non-ACID (ie old style). private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID; private long txnId = 0; // transaction id for this operation + private int statementId = -1; private transient Table table; + private Path destPath; public FileSinkDesc() { } + /** + * @param destPath - the final destination for data + */ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, final boolean compressed, final int destTableId, final boolean multiFileSpray, final boolean canBeMerged, final int numFiles, final int totalFiles, - final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx) { + final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx, Path destPath) { this.dirName = dirName; this.tableInfo = tableInfo; @@ -114,6 +119,7 @@ public class FileSinkDesc extends AbstractOperatorDesc { this.partitionCols = partitionCols; this.dpCtx = dpCtx; this.dpSortState = DPSortState.NONE; + this.destPath = destPath; } public FileSinkDesc(final Path dirName, final TableDesc tableInfo, @@ -135,7 +141,7 @@ public class FileSinkDesc extends AbstractOperatorDesc { public Object clone() throws CloneNotSupportedException { FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed, destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles, - partitionCols, dpCtx); + partitionCols, dpCtx, destPath); ret.setCompressCodec(compressCodec); ret.setCompressType(compressType); ret.setGatherStats(gatherStats); @@ -231,9 +237,6 @@ public class FileSinkDesc extends AbstractOperatorDesc { return temporary; } - /** - * @param totalFiles the totalFiles to set - */ public void setTemporary(boolean temporary) { this.temporary = temporary; } @@ -438,11 +441,23 @@ public class FileSinkDesc extends AbstractOperatorDesc { public void setTransactionId(long id) { txnId = id; } - public long getTransactionId() { return txnId; } + public void setStatementId(int id) { + statementId = id; + } + /** + * See {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options#statementId(int)} + */ + public int getStatementId() { + return statementId; + } + public Path getDestPath() { + return destPath; + } + public Table getTable() { return table; } http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index c5f2d4d..6c77ba4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -545,7 +545,9 @@ public class CompactorMR { .reporter(reporter) .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE)) .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)) - .bucket(bucket); + .bucket(bucket) + .statementId(-1);//setting statementId == -1 makes compacted delta files use + //delta_xxxx_yyyy format // Instantiate the underlying output format @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index e400778..c6ae030 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -303,7 +303,8 @@ public class TestFileSinkOperator { Map<String, String> partColNames = new HashMap<String, String>(1); partColNames.put(PARTCOL_NAME, PARTCOL_NAME); dpCtx.setInputToDPCols(partColNames); - desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx); + //todo: does this need the finalDestination? + desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null); } else { desc = new FileSinkDesc(basePath, tableDesc, false); } http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index 1e3df34..f8ded12 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -46,17 +46,23 @@ public class TestAcidUtils { AcidUtils.createFilename(p, options).toString()); options.bucket(123); assertEquals("/tmp/00123_0", - AcidUtils.createFilename(p, options).toString()); + AcidUtils.createFilename(p, options).toString()); options.bucket(23) .minimumTransactionId(100) .maximumTransactionId(200) .writingBase(true) .setOldStyle(false); assertEquals("/tmp/base_0000200/bucket_00023", - AcidUtils.createFilename(p, options).toString()); + AcidUtils.createFilename(p, options).toString()); options.writingBase(false); + assertEquals("/tmp/delta_0000100_0000200_0000/bucket_00023", + AcidUtils.createFilename(p, options).toString()); + options.statementId(-1); assertEquals("/tmp/delta_0000100_0000200/bucket_00023", - AcidUtils.createFilename(p, options).toString()); + AcidUtils.createFilename(p, options).toString()); + options.statementId(7); + assertEquals("/tmp/delta_0000100_0000200_0007/bucket_00023", + AcidUtils.createFilename(p, options).toString()); } @Test @@ -236,7 +242,6 @@ public class TestAcidUtils { new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]), new MockFile("mock:/tbl/part1/delta_0060_60/bucket_0", 500, new byte[0]), new MockFile("mock:/tbl/part1/delta_052_55/bucket_0", 500, new byte[0]), - new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]), new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0])); Path part = new MockPath(fs, "mock:/tbl/part1"); AcidUtils.Directory dir = @@ -254,6 +259,45 @@ public class TestAcidUtils { assertEquals("mock:/tbl/part1/delta_0000063_63", delts.get(3).getPath().toString()); } + /** + * Hive 1.3.0 delta dir naming scheme which supports multi-statement txns + * @throws Exception + */ + @Test + public void testOverlapingDelta2() throws Exception { + Configuration conf = new Configuration(); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_0000063_63_0/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_000062_62_0/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_000062_62_3/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_00061_61_0/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_0060_60_1/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_0060_60_4/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_0060_60_7/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_052_55/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_058_58/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0])); + Path part = new MockPath(fs, "mock:/tbl/part1"); + AcidUtils.Directory dir = + AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:")); + assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString()); + List<FileStatus> obsolete = dir.getObsolete(); + assertEquals(5, obsolete.size()); + assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_058_58", obsolete.get(1).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_0060_60_1", obsolete.get(2).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_0060_60_4", obsolete.get(3).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_0060_60_7", obsolete.get(4).getPath().toString()); + List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories(); + assertEquals(5, delts.size()); + assertEquals("mock:/tbl/part1/delta_40_60", delts.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_00061_61_0", delts.get(1).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_000062_62_0", delts.get(2).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_000062_62_3", delts.get(3).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_0000063_63_0", delts.get(4).getPath().toString()); + } + @Test public void deltasWithOpenTxnInRead() throws Exception { Configuration conf = new Configuration(); @@ -268,6 +312,27 @@ public class TestAcidUtils { assertEquals("mock:/tbl/part1/delta_2_5", delts.get(1).getPath().toString()); } + /** + * @since 1.3.0 + * @throws Exception + */ + @Test + public void deltasWithOpenTxnInRead2() throws Exception { + Configuration conf = new Configuration(); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_4_4_1/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_4_4_3/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_101_101_1/bucket_0", 500, new byte[0])); + Path part = new MockPath(fs, "mock:/tbl/part1"); + AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4")); + List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories(); + assertEquals(2, delts.size()); + assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_2_5", delts.get(1).getPath().toString()); + } + @Test public void deltasWithOpenTxnsNotInCompact() throws Exception { Configuration conf = new Configuration(); http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 56e5f9f..e96ab2a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveInputFormat; @@ -927,7 +928,7 @@ public class TestInputOutputFormat { OrcInputFormat.SplitGenerator splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, fs.getFileStatus(new Path("/a/file")), null, true, - new ArrayList<Long>(), true, null, null)); + new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null)); OrcSplit result = splitter.createSplit(0, 200, null); assertEquals(0, result.getStart()); assertEquals(200, result.getLength()); @@ -968,7 +969,7 @@ public class TestInputOutputFormat { OrcInputFormat.SplitGenerator splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, fs.getFileStatus(new Path("/a/file")), null, true, - new ArrayList<Long>(), true, null, null)); + new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null)); List<OrcSplit> results = splitter.call(); OrcSplit result = results.get(0); assertEquals(3, result.getStart()); @@ -990,7 +991,7 @@ public class TestInputOutputFormat { conf.setInt(OrcInputFormat.MAX_SPLIT_SIZE, 0); context = new OrcInputFormat.Context(conf); splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs, - fs.getFileStatus(new Path("/a/file")), null, true, new ArrayList<Long>(), + fs.getFileStatus(new Path("/a/file")), null, true, new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null)); results = splitter.call(); for(int i=0; i < stripeSizes.length; ++i) { @@ -1497,7 +1498,7 @@ public class TestInputOutputFormat { Path partDir = new Path(conf.get("mapred.input.dir")); OrcRecordUpdater writer = new OrcRecordUpdater(partDir, new AcidOutputFormat.Options(conf).maximumTransactionId(10) - .writingBase(true).bucket(0).inspector(inspector)); + .writingBase(true).bucket(0).inspector(inspector).finalDestination(partDir)); for(int i=0; i < 100; ++i) { BigRow row = new BigRow(i); writer.insert(10, row); @@ -1648,7 +1649,7 @@ public class TestInputOutputFormat { // write a base file in partition 0 OrcRecordUpdater writer = new OrcRecordUpdater(partDir[0], new AcidOutputFormat.Options(conf).maximumTransactionId(10) - .writingBase(true).bucket(0).inspector(inspector)); + .writingBase(true).bucket(0).inspector(inspector).finalDestination(partDir[0])); for(int i=0; i < 10; ++i) { writer.insert(10, new MyRow(i, 2 * i)); } @@ -1661,7 +1662,7 @@ public class TestInputOutputFormat { // write a delta file in partition 0 writer = new OrcRecordUpdater(partDir[0], new AcidOutputFormat.Options(conf).maximumTransactionId(10) - .writingBase(true).bucket(1).inspector(inspector)); + .writingBase(true).bucket(1).inspector(inspector).finalDestination(partDir[0])); for(int i=10; i < 20; ++i) { writer.insert(10, new MyRow(i, 2*i)); } http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index 921e954..39f71f1 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -62,12 +62,12 @@ import static org.junit.Assert.assertNull; public class TestOrcRawRecordMerger { private static final Log LOG = LogFactory.getLog(TestOrcRawRecordMerger.class); - +//todo: why is statementId -1? @Test public void testOrdering() throws Exception { ReaderKey left = new ReaderKey(100, 200, 1200, 300); ReaderKey right = new ReaderKey(); - right.setValues(100, 200, 1000, 200); + right.setValues(100, 200, 1000, 200,1); assertTrue(right.compareTo(left) < 0); assertTrue(left.compareTo(right) > 0); assertEquals(false, left.equals(right)); @@ -76,16 +76,16 @@ public class TestOrcRawRecordMerger { assertEquals(true, right.equals(left)); right.setRowId(2000); assertTrue(right.compareTo(left) > 0); - left.setValues(1, 2, 3, 4); - right.setValues(100, 2, 3, 4); + left.setValues(1, 2, 3, 4,-1); + right.setValues(100, 2, 3, 4,-1); assertTrue(left.compareTo(right) < 0); assertTrue(right.compareTo(left) > 0); - left.setValues(1, 2, 3, 4); - right.setValues(1, 100, 3, 4); + left.setValues(1, 2, 3, 4,-1); + right.setValues(1, 100, 3, 4,-1); assertTrue(left.compareTo(right) < 0); assertTrue(right.compareTo(left) > 0); - left.setValues(1, 2, 3, 100); - right.setValues(1, 2, 3, 4); + left.setValues(1, 2, 3, 100,-1); + right.setValues(1, 2, 3, 4,-1); assertTrue(left.compareTo(right) < 0); assertTrue(right.compareTo(left) > 0); @@ -177,7 +177,7 @@ public class TestOrcRawRecordMerger { RecordIdentifier minKey = new RecordIdentifier(10, 20, 30); RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60); ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey, - new Reader.Options()); + new Reader.Options(), 0); RecordReader recordReader = pair.recordReader; assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketId()); @@ -203,7 +203,7 @@ public class TestOrcRawRecordMerger { Reader reader = createMockReader(); ReaderPair pair = new ReaderPair(key, reader, 20, null, null, - new Reader.Options()); + new Reader.Options(), 0); RecordReader recordReader = pair.recordReader; assertEquals(10, key.getTransactionId()); assertEquals(20, key.getBucketId()); @@ -489,7 +489,7 @@ public class TestOrcRawRecordMerger { // write the empty base AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .inspector(inspector).bucket(BUCKET).writingBase(true) - .maximumTransactionId(100); + .maximumTransactionId(100).finalDestination(root); of.getRecordUpdater(root, options).close(false); ValidTxnList txnList = new ValidReadTxnList("200:"); @@ -515,6 +515,10 @@ public class TestOrcRawRecordMerger { */ @Test public void testNewBaseAndDelta() throws Exception { + testNewBaseAndDelta(false); + testNewBaseAndDelta(true); + } + private void testNewBaseAndDelta(boolean use130Format) throws Exception { final int BUCKET = 10; String[] values = new String[]{"first", "second", "third", "fourth", "fifth", "sixth", "seventh", "eighth", @@ -532,7 +536,10 @@ public class TestOrcRawRecordMerger { // write the base AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) - .inspector(inspector).bucket(BUCKET); + .inspector(inspector).bucket(BUCKET).finalDestination(root); + if(!use130Format) { + options.statementId(-1); + } RecordUpdater ru = of.getRecordUpdater(root, options.writingBase(true).maximumTransactionId(100)); for(String v: values) { @@ -554,7 +561,8 @@ public class TestOrcRawRecordMerger { AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList); assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory()); - assertEquals(new Path(root, "delta_0000200_0000200"), + assertEquals(new Path(root, use130Format ? + AcidUtils.deltaSubdir(200,200,0) : AcidUtils.deltaSubdir(200,200)), directory.getCurrentDirectories().get(0).getPath()); Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(), @@ -829,7 +837,7 @@ public class TestOrcRawRecordMerger { // write a delta AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .writingBase(false).minimumTransactionId(1).maximumTransactionId(1) - .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5); + .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5).finalDestination(root); RecordUpdater ru = of.getRecordUpdater(root, options); values = new String[]{"0.0", null, null, "1.1", null, null, null, "ignore.7"}; @@ -920,6 +928,7 @@ public class TestOrcRawRecordMerger { options.orcOptions(OrcFile.writerOptions(conf) .stripeSize(1).blockPadding(false).compress(CompressionKind.NONE) .memory(mgr)); + options.finalDestination(root); RecordUpdater ru = of.getRecordUpdater(root, options); String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3", "2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"}; @@ -1004,7 +1013,8 @@ public class TestOrcRawRecordMerger { AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .bucket(BUCKET).inspector(inspector).filesystem(fs) - .writingBase(false).minimumTransactionId(1).maximumTransactionId(1); + .writingBase(false).minimumTransactionId(1).maximumTransactionId(1) + .finalDestination(root); RecordUpdater ru = of.getRecordUpdater(root, options); String[] values = new String[]{"a", "b", "c", "d", "e"}; for(int i=0; i < values.length; ++i) { @@ -1047,6 +1057,14 @@ public class TestOrcRawRecordMerger { */ @Test public void testRecordReaderIncompleteDelta() throws Exception { + testRecordReaderIncompleteDelta(false); + testRecordReaderIncompleteDelta(true); + } + /** + * + * @param use130Format true means use delta_0001_0001_0000 format, else delta_0001_00001 + */ + private void testRecordReaderIncompleteDelta(boolean use130Format) throws Exception { final int BUCKET = 1; Configuration conf = new Configuration(); OrcOutputFormat of = new OrcOutputFormat(); @@ -1063,7 +1081,10 @@ public class TestOrcRawRecordMerger { AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .writingBase(true).minimumTransactionId(0).maximumTransactionId(0) - .bucket(BUCKET).inspector(inspector).filesystem(fs); + .bucket(BUCKET).inspector(inspector).filesystem(fs).finalDestination(root); + if(!use130Format) { + options.statementId(-1); + } RecordUpdater ru = of.getRecordUpdater(root, options); String[] values= new String[]{"1", "2", "3", "4", "5"}; for(int i=0; i < values.length; ++i) { @@ -1110,8 +1131,8 @@ public class TestOrcRawRecordMerger { splits = inf.getSplits(job, 1); assertEquals(2, splits.length); rr = inf.getRecordReader(splits[0], job, Reporter.NULL); - Path sideFile = new Path(root + - "/delta_0000010_0000019/bucket_00001_flush_length"); + Path sideFile = new Path(root + "/" + (use130Format ? AcidUtils.deltaSubdir(10,19,0) : + AcidUtils.deltaSubdir(10,19)) + "/bucket_00001_flush_length"); assertEquals(true, fs.exists(sideFile)); assertEquals(24, fs.getFileStatus(sideFile).getLen()); http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java index 22bd4b9..22030b4 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java @@ -97,7 +97,8 @@ public class TestOrcRecordUpdater { .minimumTransactionId(10) .maximumTransactionId(19) .inspector(inspector) - .reporter(Reporter.NULL); + .reporter(Reporter.NULL) + .finalDestination(root); RecordUpdater updater = new OrcRecordUpdater(root, options); updater.insert(11, new MyRow("first")); updater.insert(11, new MyRow("second")); @@ -197,7 +198,8 @@ public class TestOrcRecordUpdater { .maximumTransactionId(100) .inspector(inspector) .reporter(Reporter.NULL) - .recordIdColumn(1); + .recordIdColumn(1) + .finalDestination(root); RecordUpdater updater = new OrcRecordUpdater(root, options); updater.update(100, new MyRow("update", 30, 10, bucket)); updater.delete(100, new MyRow("", 60, 40, bucket)); http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index 671e122..21adc9d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -241,7 +241,7 @@ public abstract class CompactorTest { return sd; } - // I can't do this with @Before because I want to be able to control when the thead starts + // I can't do this with @Before because I want to be able to control when the thread starts private void startThread(char type, boolean stopAfterOne) throws Exception { startThread(type, stopAfterOne, new AtomicBoolean()); } @@ -284,7 +284,7 @@ public abstract class CompactorTest { switch (type) { case BASE: filename = "base_" + maxTxn; break; case LENGTH_FILE: // Fall through to delta - case DELTA: filename = "delta_" + minTxn + "_" + maxTxn; break; + case DELTA: filename = makeDeltaDirName(minTxn, maxTxn); break; case LEGACY: break; // handled below } @@ -508,5 +508,21 @@ public abstract class CompactorTest { } } + /** + * in Hive 1.3.0 delta file names changed to delta_xxxx_yyyy_zzzz; prior to that + * the name was delta_xxxx_yyyy. We want to run compaction tests such that both formats + * are used since new (1.3) code has to be able to read old files. + */ + abstract boolean useHive130DeltaDirName(); + String makeDeltaDirName(long minTxnId, long maxTxnId) { + return useHive130DeltaDirName() ? + AcidUtils.deltaSubdir(minTxnId, maxTxnId, 0) : AcidUtils.deltaSubdir(minTxnId, maxTxnId); + } + /** + * delta dir name after compaction + */ + String makeDeltaDirNameCompacted(long minTxnId, long maxTxnId) { + return AcidUtils.deltaSubdir(minTxnId, maxTxnId); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index ffdbb9a..0db732c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -139,7 +139,7 @@ public class TestCleaner extends CompactorTest { boolean sawBase = false, sawDelta = false; for (Path p : paths) { if (p.getName().equals("base_20")) sawBase = true; - else if (p.getName().equals("delta_21_24")) sawDelta = true; + else if (p.getName().equals(makeDeltaDirName(21, 24))) sawDelta = true; else Assert.fail("Unexpected file " + p.getName()); } Assert.assertTrue(sawBase); @@ -177,7 +177,7 @@ public class TestCleaner extends CompactorTest { boolean sawBase = false, sawDelta = false; for (Path path : paths) { if (path.getName().equals("base_20")) sawBase = true; - else if (path.getName().equals("delta_21_24")) sawDelta = true; + else if (path.getName().equals(makeDeltaDirNameCompacted(21, 24))) sawDelta = true; else Assert.fail("Unexpected file " + path.getName()); } Assert.assertTrue(sawBase); @@ -480,4 +480,8 @@ public class TestCleaner extends CompactorTest { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals(0, rsp.getCompactsSize()); } + @Override + boolean useHive130DeltaDirName() { + return false; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java new file mode 100644 index 0000000..c637dd1 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java @@ -0,0 +1,14 @@ +package org.apache.hadoop.hive.ql.txn.compactor; + +/** + * Same as TestCleaner but tests delta file names in Hive 1.3.0 format + */ +public class TestCleaner2 extends TestCleaner { + public TestCleaner2() throws Exception { + super(); + } + @Override + boolean useHive130DeltaDirName() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/c30ab468/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index 00b13de..0b0b1da 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -713,5 +713,9 @@ public class TestInitiator extends CompactorTest { List<ShowCompactResponseElement> compacts = rsp.getCompacts(); Assert.assertEquals(0, compacts.size()); } + @Override + boolean useHive130DeltaDirName() { + return false; + } }