http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 77736ee..8caa265 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -230,11 +230,11 @@ public class VectorizedOrcAcidRowBatchReader private static final class OffsetAndBucketProperty { private final long rowIdOffset; private final int bucketProperty; - private final long syntheticTxnId; - private OffsetAndBucketProperty(long rowIdOffset, int bucketProperty, long syntheticTxnId) { + private final long syntheticWriteId; + private OffsetAndBucketProperty(long rowIdOffset, int bucketProperty, long syntheticWriteId) { this.rowIdOffset = rowIdOffset; this.bucketProperty = bucketProperty; - this.syntheticTxnId = syntheticTxnId; + this.syntheticWriteId = syntheticWriteId; } } /** @@ -253,7 +253,7 @@ public class VectorizedOrcAcidRowBatchReader if (!needSyntheticRowIds(split.isOriginal(), !deleteEventRegistry.isEmpty(), rowIdProjected)) { if(split.isOriginal()) { /** - * Even if we don't need to project ROW_IDs, we still need to check the transaction ID that + * Even if we don't need to project ROW_IDs, we still need to check the write ID that * created the file to see if it's committed. See more in * {@link #next(NullWritable, VectorizedRowBatch)}. (In practice getAcidState() should * filter out base/delta files but this makes fewer dependencies) @@ -352,7 +352,7 @@ public class VectorizedOrcAcidRowBatchReader /** * There are 2 types of schema from the {@link #baseReader} that this handles. In the case * the data was written to a transactional table from the start, every row is decorated with - * transaction related info and looks like <op, otid, writerId, rowid, ctid, <f1, ... fn>>. + * transaction related info and looks like <op, owid, writerId, rowid, cwid, <f1, ... fn>>. * * The other case is when data was written to non-transactional table and thus only has the user * data: <f1, ... fn>. Then this table was then converted to a transactional table but the data @@ -360,7 +360,7 @@ public class VectorizedOrcAcidRowBatchReader * * In this case we may need to decorate the outgoing data with transactional column values at * read time. (It's done somewhat out of band via VectorizedRowBatchCtx - ask Teddy Choi). - * The "otid, writerId, rowid" columns represent {@link RecordIdentifier}. They are assigned + * The "owid, writerId, rowid" columns represent {@link RecordIdentifier}. They are assigned * each time the table is read in a way that needs to project {@link VirtualColumn#ROWID}. * Major compaction will attach these values to each row permanently. * It's critical that these generated column values are assigned exactly the same way by each @@ -420,8 +420,8 @@ public class VectorizedOrcAcidRowBatchReader // Handle synthetic row IDs for the original files. innerRecordIdColumnVector = handleOriginalFile(selectedBitSet, innerRecordIdColumnVector); } else { - // Case 1- find rows which belong to transactions that are not valid. - findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet); + // Case 1- find rows which belong to write Ids that are not valid. + findRecordsWithInvalidWriteIds(vectorizedRowBatchBase, selectedBitSet); } // Case 2- find rows which have been deleted. @@ -495,7 +495,7 @@ public class VectorizedOrcAcidRowBatchReader */ recordIdColumnVector.fields[0].noNulls = true; recordIdColumnVector.fields[0].isRepeating = true; - ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = syntheticProps.syntheticTxnId; + ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = syntheticProps.syntheticWriteId; /** * This is {@link RecordIdentifier#getBucketProperty()} * Also see {@link BucketCodec} @@ -521,21 +521,21 @@ public class VectorizedOrcAcidRowBatchReader //these are insert events so (original txn == current) txn for all rows innerRecordIdColumnVector[OrcRecordUpdater.CURRENT_WRITEID] = recordIdColumnVector.fields[0]; } - if(syntheticProps.syntheticTxnId > 0) { + if(syntheticProps.syntheticWriteId > 0) { //"originals" (written before table was converted to acid) is considered written by - // txnid:0 which is always committed so there is no need to check wrt invalid transactions + // writeid:0 which is always committed so there is no need to check wrt invalid write Ids //But originals written by Load Data for example can be in base_x or delta_x_x so we must //check if 'x' is committed or not evn if ROW_ID is not needed in the Operator pipeline. if (needSyntheticRowId) { - findRecordsWithInvalidTransactionIds(innerRecordIdColumnVector, + findRecordsWithInvalidWriteIds(innerRecordIdColumnVector, vectorizedRowBatchBase.size, selectedBitSet); } else { /*since ROW_IDs are not needed we didn't create the ColumnVectors to hold them but we * still have to check if the data being read is committed as far as current * reader (transactions) is concerned. Since here we are reading 'original' schema file, - * all rows in it have been created by the same txn, namely 'syntheticProps.syntheticTxnId' + * all rows in it have been created by the same txn, namely 'syntheticProps.syntheticWriteId' */ - if (!validWriteIdList.isWriteIdValid(syntheticProps.syntheticTxnId)) { + if (!validWriteIdList.isWriteIdValid(syntheticProps.syntheticWriteId)) { selectedBitSet.clear(0, vectorizedRowBatchBase.size); } } @@ -543,29 +543,29 @@ public class VectorizedOrcAcidRowBatchReader return innerRecordIdColumnVector; } - private void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitSet selectedBitSet) { - findRecordsWithInvalidTransactionIds(batch.cols, batch.size, selectedBitSet); + private void findRecordsWithInvalidWriteIds(VectorizedRowBatch batch, BitSet selectedBitSet) { + findRecordsWithInvalidWriteIds(batch.cols, batch.size, selectedBitSet); } - private void findRecordsWithInvalidTransactionIds(ColumnVector[] cols, int size, BitSet selectedBitSet) { + private void findRecordsWithInvalidWriteIds(ColumnVector[] cols, int size, BitSet selectedBitSet) { if (cols[OrcRecordUpdater.CURRENT_WRITEID].isRepeating) { // When we have repeating values, we can unset the whole bitset at once - // if the repeating value is not a valid transaction. - long currentTransactionIdForBatch = ((LongColumnVector) + // if the repeating value is not a valid write id. + long currentWriteIdForBatch = ((LongColumnVector) cols[OrcRecordUpdater.CURRENT_WRITEID]).vector[0]; - if (!validWriteIdList.isWriteIdValid(currentTransactionIdForBatch)) { + if (!validWriteIdList.isWriteIdValid(currentWriteIdForBatch)) { selectedBitSet.clear(0, size); } return; } - long[] currentTransactionVector = + long[] currentWriteIdVector = ((LongColumnVector) cols[OrcRecordUpdater.CURRENT_WRITEID]).vector; // Loop through the bits that are set to true and mark those rows as false, if their - // current transactions are not valid. + // current write ids are not valid. for (int setBitIndex = selectedBitSet.nextSetBit(0); setBitIndex >= 0; setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1)) { - if (!validWriteIdList.isWriteIdValid(currentTransactionVector[setBitIndex])) { + if (!validWriteIdList.isWriteIdValid(currentWriteIdVector[setBitIndex])) { selectedBitSet.clear(setBitIndex); } } @@ -690,7 +690,7 @@ public class VectorizedOrcAcidRowBatchReader return; } - long[] originalTransaction = + long[] originalWriteId = cols[OrcRecordUpdater.ORIGINAL_WRITEID].isRepeating ? null : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_WRITEID]).vector; long[] bucket = @@ -701,7 +701,7 @@ public class VectorizedOrcAcidRowBatchReader : ((LongColumnVector) cols[OrcRecordUpdater.ROW_ID]).vector; // The following repeatedX values will be set, if any of the columns are repeating. - long repeatedOriginalTransaction = (originalTransaction != null) ? -1 + long repeatedOriginalWriteId = (originalWriteId != null) ? -1 : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_WRITEID]).vector[0]; long repeatedBucket = (bucket != null) ? -1 : ((LongColumnVector) cols[OrcRecordUpdater.BUCKET]).vector[0]; @@ -716,7 +716,7 @@ public class VectorizedOrcAcidRowBatchReader } RecordIdentifier firstRecordIdInBatch = new RecordIdentifier( - originalTransaction != null ? originalTransaction[firstValidIndex] : repeatedOriginalTransaction, + originalWriteId != null ? originalWriteId[firstValidIndex] : repeatedOriginalWriteId, bucket != null ? (int) bucket[firstValidIndex] : (int) repeatedBucket, rowId != null ? (int) rowId[firstValidIndex] : repeatedRowId); @@ -724,7 +724,7 @@ public class VectorizedOrcAcidRowBatchReader int lastValidIndex = selectedBitSet.previousSetBit(size - 1); RecordIdentifier lastRecordIdInBatch = new RecordIdentifier( - originalTransaction != null ? originalTransaction[lastValidIndex] : repeatedOriginalTransaction, + originalWriteId != null ? originalWriteId[lastValidIndex] : repeatedOriginalWriteId, bucket != null ? (int) bucket[lastValidIndex] : (int) repeatedBucket, rowId != null ? (int) rowId[lastValidIndex] : repeatedRowId); @@ -743,7 +743,7 @@ public class VectorizedOrcAcidRowBatchReader RecordIdentifier currRecordIdInBatch = new RecordIdentifier(); while (isDeleteRecordAvailable && currIndex != -1 && currIndex <= lastValidIndex) { currRecordIdInBatch.setValues( - (originalTransaction != null) ? originalTransaction[currIndex] : repeatedOriginalTransaction, + (originalWriteId != null) ? originalWriteId[currIndex] : repeatedOriginalWriteId, (bucket != null) ? (int) bucket[currIndex] : (int) repeatedBucket, (rowId != null) ? rowId[currIndex] : repeatedRowId); @@ -780,34 +780,34 @@ public class VectorizedOrcAcidRowBatchReader * An implementation for DeleteEventRegistry that optimizes for performance by loading * all the delete events into memory at once from all the delete delta files. * It starts by reading all the delete events through a regular sort merge logic - * into 3 vectors- one for original transaction id (otid), one for bucket property and one for + * into 3 vectors- one for original Write id (owid), one for bucket property and one for * row id. See {@link BucketCodec} for more about bucket property. - * The otids are likely to be repeated very often, as a single transaction - * often deletes thousands of rows. Hence, the otid vector is compressed to only store the + * The owids are likely to be repeated very often, as a single transaction + * often deletes thousands of rows. Hence, the owid vector is compressed to only store the * toIndex and fromIndex ranges in the larger row id vector. Now, querying whether a * record id is deleted or not, is done by performing a binary search on the - * compressed otid range. If a match is found, then a binary search is then performed on + * compressed owid range. If a match is found, then a binary search is then performed on * the larger rowId vector between the given toIndex and fromIndex. Of course, there is rough * heuristic that prevents creation of an instance of this class if the memory pressure is high. * The SortMergedDeleteEventRegistry is then the fallback method for such scenarios. */ static class ColumnizedDeleteEventRegistry implements DeleteEventRegistry { /** - * A simple wrapper class to hold the (otid, bucketProperty, rowId) pair. + * A simple wrapper class to hold the (owid, bucketProperty, rowId) pair. */ static class DeleteRecordKey implements Comparable<DeleteRecordKey> { - private long originalTransactionId; + private long originalWriteId; /** * see {@link BucketCodec} */ private int bucketProperty; private long rowId; DeleteRecordKey() { - this.originalTransactionId = -1; + this.originalWriteId = -1; this.rowId = -1; } - public void set(long otid, int bucketProperty, long rowId) { - this.originalTransactionId = otid; + public void set(long owid, int bucketProperty, long rowId) { + this.originalWriteId = owid; this.bucketProperty = bucketProperty; this.rowId = rowId; } @@ -817,8 +817,8 @@ public class VectorizedOrcAcidRowBatchReader if (other == null) { return -1; } - if (originalTransactionId != other.originalTransactionId) { - return originalTransactionId < other.originalTransactionId ? -1 : 1; + if (originalWriteId != other.originalWriteId) { + return originalWriteId < other.originalWriteId ? -1 : 1; } if(bucketProperty != other.bucketProperty) { return bucketProperty < other.bucketProperty ? -1 : 1; @@ -830,7 +830,7 @@ public class VectorizedOrcAcidRowBatchReader } @Override public String toString() { - return "otid: " + originalTransactionId + " bucketP:" + bucketProperty + " rowid: " + rowId; + return "owid: " + originalWriteId + " bucketP:" + bucketProperty + " rowid: " + rowId; } } @@ -881,12 +881,12 @@ public class VectorizedOrcAcidRowBatchReader return false; // no more batches to read, exhausted the reader. } } - long currentTransaction = setCurrentDeleteKey(deleteRecordKey); + long currentWriteId = setCurrentDeleteKey(deleteRecordKey); if(!isBucketPropertyRepeating) { checkBucketId(deleteRecordKey.bucketProperty); } ++indexPtrInBatch; - if (validWriteIdList.isWriteIdValid(currentTransaction)) { + if (validWriteIdList.isWriteIdValid(currentWriteId)) { isValidNext = true; } } @@ -897,20 +897,20 @@ public class VectorizedOrcAcidRowBatchReader this.recordReader.close(); } private long setCurrentDeleteKey(DeleteRecordKey deleteRecordKey) { - int originalTransactionIndex = + int originalWriteIdIndex = batch.cols[OrcRecordUpdater.ORIGINAL_WRITEID].isRepeating ? 0 : indexPtrInBatch; - long originalTransaction - = ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_WRITEID]).vector[originalTransactionIndex]; + long originalWriteId + = ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_WRITEID]).vector[originalWriteIdIndex]; int bucketPropertyIndex = batch.cols[OrcRecordUpdater.BUCKET].isRepeating ? 0 : indexPtrInBatch; int bucketProperty = (int)((LongColumnVector)batch.cols[OrcRecordUpdater.BUCKET]).vector[bucketPropertyIndex]; long rowId = ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector[indexPtrInBatch]; - int currentTransactionIndex + int currentWriteIdIndex = batch.cols[OrcRecordUpdater.CURRENT_WRITEID].isRepeating ? 0 : indexPtrInBatch; - long currentTransaction - = ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_WRITEID]).vector[currentTransactionIndex]; - deleteRecordKey.set(originalTransaction, bucketProperty, rowId); - return currentTransaction; + long currentWriteId + = ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_WRITEID]).vector[currentWriteIdIndex]; + deleteRecordKey.set(originalWriteId, bucketProperty, rowId); + return currentWriteId; } private void checkBucketId() throws IOException { isBucketPropertyRepeating = batch.cols[OrcRecordUpdater.BUCKET].isRepeating; @@ -949,31 +949,31 @@ public class VectorizedOrcAcidRowBatchReader } } /** - * A CompressedOtid class stores a compressed representation of the original - * transaction ids (otids) read from the delete delta files. Since the record ids - * are sorted by (otid, rowId) and otids are highly likely to be repetitive, it is - * efficient to compress them as a CompressedOtid that stores the fromIndex and + * A CompressedOwid class stores a compressed representation of the original + * write ids (owids) read from the delete delta files. Since the record ids + * are sorted by (owid, rowId) and owids are highly likely to be repetitive, it is + * efficient to compress them as a CompressedOwid that stores the fromIndex and * the toIndex. These fromIndex and toIndex reference the larger vector formed by * concatenating the correspondingly ordered rowIds. */ - private final class CompressedOtid implements Comparable<CompressedOtid> { - final long originalTransactionId; + private final class CompressedOwid implements Comparable<CompressedOwid> { + final long originalWriteId; final int bucketProperty; final int fromIndex; // inclusive final int toIndex; // exclusive - CompressedOtid(long otid, int bucketProperty, int fromIndex, int toIndex) { - this.originalTransactionId = otid; + CompressedOwid(long owid, int bucketProperty, int fromIndex, int toIndex) { + this.originalWriteId = owid; this.bucketProperty = bucketProperty; this.fromIndex = fromIndex; this.toIndex = toIndex; } @Override - public int compareTo(CompressedOtid other) { - // When comparing the CompressedOtid, the one with the lesser value is smaller. - if (originalTransactionId != other.originalTransactionId) { - return originalTransactionId < other.originalTransactionId ? -1 : 1; + public int compareTo(CompressedOwid other) { + // When comparing the CompressedOwid, the one with the lesser value is smaller. + if (originalWriteId != other.originalWriteId) { + return originalWriteId < other.originalWriteId ? -1 : 1; } if(bucketProperty != other.bucketProperty) { return bucketProperty < other.bucketProperty ? -1 : 1; @@ -988,14 +988,14 @@ public class VectorizedOrcAcidRowBatchReader * all delete deltas at once - possibly causing OOM same as for {@link SortMergedDeleteEventRegistry} * which uses {@link OrcRawRecordMerger}. Why not load all delete_delta sequentially. Each * dd is sorted by {@link RecordIdentifier} so we could create a BTree like structure where the - * 1st level is an array of originalTransactionId where each entry points at an array + * 1st level is an array of originalWriteId where each entry points at an array * of bucketIds where each entry points at an array of rowIds. We could probably use ArrayList * to manage insertion as the structure is built (LinkedList?). This should reduce memory * footprint (as far as OrcReader to a single reader) - probably bad for LLAP IO */ private TreeMap<DeleteRecordKey, DeleteReaderValue> sortMerger; private long rowIds[]; - private CompressedOtid compressedOtids[]; + private CompressedOwid compressedOwids[]; private ValidWriteIdList validWriteIdList; private Boolean isEmpty = null; @@ -1009,7 +1009,7 @@ public class VectorizedOrcAcidRowBatchReader + " isFullAcidTable: " + AcidUtils.isFullAcidScan(conf)); this.sortMerger = new TreeMap<DeleteRecordKey, DeleteReaderValue>(); this.rowIds = null; - this.compressedOtids = null; + this.compressedOwids = null; int maxEventsInMemory = HiveConf.getIntVar(conf, ConfVars.HIVE_TRANSACTIONAL_NUM_EVENTS_IN_MEMORY); final boolean isBucketedTable = conf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0) > 0; @@ -1064,7 +1064,7 @@ public class VectorizedOrcAcidRowBatchReader readAllDeleteEventsFromDeleteDeltas(); } } - isEmpty = compressedOtids == null || rowIds == null; + isEmpty = compressedOwids == null || rowIds == null; } catch(IOException|DeleteEventsOverflowMemoryException e) { close(); // close any open readers, if there was some exception during initialization. throw e; // rethrow the exception so that the caller can handle. @@ -1072,19 +1072,19 @@ public class VectorizedOrcAcidRowBatchReader } /** - * This is not done quite right. The intent of {@link CompressedOtid} is a hedge against + * This is not done quite right. The intent of {@link CompressedOwid} is a hedge against * "delete from T" that generates a huge number of delete events possibly even 2G - max array * size. (assuming no one txn inserts > 2G rows (in a bucket)). As implemented, the algorithm - * first loads all data into one array otid[] and rowIds[] which defeats the purpose. + * first loads all data into one array owid[] and rowIds[] which defeats the purpose. * In practice we should be filtering delete evens by min/max ROW_ID from the split. The later * is also not yet implemented: HIVE-16812. */ private void readAllDeleteEventsFromDeleteDeltas() throws IOException { if (sortMerger == null || sortMerger.isEmpty()) return; // trivial case, nothing to read. - int distinctOtids = 0; - long lastSeenOtid = -1; + int distinctOwids = 0; + long lastSeenOwid = -1; int lastSeenBucketProperty = -1; - long otids[] = new long[rowIds.length]; + long owids[] = new long[rowIds.length]; int[] bucketProperties = new int [rowIds.length]; int index = 0; @@ -1101,14 +1101,14 @@ public class VectorizedOrcAcidRowBatchReader Entry<DeleteRecordKey, DeleteReaderValue> entry = sortMerger.pollFirstEntry(); DeleteRecordKey deleteRecordKey = entry.getKey(); DeleteReaderValue deleteReaderValue = entry.getValue(); - otids[index] = deleteRecordKey.originalTransactionId; + owids[index] = deleteRecordKey.originalWriteId; bucketProperties[index] = deleteRecordKey.bucketProperty; rowIds[index] = deleteRecordKey.rowId; ++index; - if (lastSeenOtid != deleteRecordKey.originalTransactionId || + if (lastSeenOwid != deleteRecordKey.originalWriteId || lastSeenBucketProperty != deleteRecordKey.bucketProperty) { - ++distinctOtids; - lastSeenOtid = deleteRecordKey.originalTransactionId; + ++distinctOwids; + lastSeenOwid = deleteRecordKey.originalWriteId; lastSeenBucketProperty = deleteRecordKey.bucketProperty; } if (deleteReaderValue.next(deleteRecordKey)) { @@ -1118,49 +1118,49 @@ public class VectorizedOrcAcidRowBatchReader } } - // Once we have processed all the delete events and seen all the distinct otids, - // we compress the otids into CompressedOtid data structure that records - // the fromIndex(inclusive) and toIndex(exclusive) for each unique otid. - this.compressedOtids = new CompressedOtid[distinctOtids]; - lastSeenOtid = otids[0]; + // Once we have processed all the delete events and seen all the distinct owids, + // we compress the owids into CompressedOwid data structure that records + // the fromIndex(inclusive) and toIndex(exclusive) for each unique owid. + this.compressedOwids = new CompressedOwid[distinctOwids]; + lastSeenOwid = owids[0]; lastSeenBucketProperty = bucketProperties[0]; int fromIndex = 0, pos = 0; - for (int i = 1; i < otids.length; ++i) { - if (otids[i] != lastSeenOtid || lastSeenBucketProperty != bucketProperties[i]) { - compressedOtids[pos] = - new CompressedOtid(lastSeenOtid, lastSeenBucketProperty, fromIndex, i); - lastSeenOtid = otids[i]; + for (int i = 1; i < owids.length; ++i) { + if (owids[i] != lastSeenOwid || lastSeenBucketProperty != bucketProperties[i]) { + compressedOwids[pos] = + new CompressedOwid(lastSeenOwid, lastSeenBucketProperty, fromIndex, i); + lastSeenOwid = owids[i]; lastSeenBucketProperty = bucketProperties[i]; fromIndex = i; ++pos; } } - // account for the last distinct otid - compressedOtids[pos] = - new CompressedOtid(lastSeenOtid, lastSeenBucketProperty, fromIndex, otids.length); + // account for the last distinct owid + compressedOwids[pos] = + new CompressedOwid(lastSeenOwid, lastSeenBucketProperty, fromIndex, owids.length); } - private boolean isDeleted(long otid, int bucketProperty, long rowId) { - if (compressedOtids == null || rowIds == null) { + private boolean isDeleted(long owid, int bucketProperty, long rowId) { + if (compressedOwids == null || rowIds == null) { return false; } - // To find if a given (otid, rowId) pair is deleted or not, we perform + // To find if a given (owid, rowId) pair is deleted or not, we perform // two binary searches at most. The first binary search is on the - // compressed otids. If a match is found, only then we do the next + // compressed owids. If a match is found, only then we do the next // binary search in the larger rowId vector between the given toIndex & fromIndex. - // Check if otid is outside the range of all otids present. - if (otid < compressedOtids[0].originalTransactionId - || otid > compressedOtids[compressedOtids.length - 1].originalTransactionId) { + // Check if owid is outside the range of all owids present. + if (owid < compressedOwids[0].originalWriteId + || owid > compressedOwids[compressedOwids.length - 1].originalWriteId) { return false; } - // Create a dummy key for searching the otid/bucket in the compressed otid ranges. - CompressedOtid key = new CompressedOtid(otid, bucketProperty, -1, -1); - int pos = Arrays.binarySearch(compressedOtids, key); + // Create a dummy key for searching the owid/bucket in the compressed owid ranges. + CompressedOwid key = new CompressedOwid(owid, bucketProperty, -1, -1); + int pos = Arrays.binarySearch(compressedOwids, key); if (pos >= 0) { - // Otid with the given value found! Searching now for rowId... - key = compressedOtids[pos]; // Retrieve the actual CompressedOtid that matched. - // Check if rowId is outside the range of all rowIds present for this otid. + // Owid with the given value found! Searching now for rowId... + key = compressedOwids[pos]; // Retrieve the actual CompressedOwid that matched. + // Check if rowId is outside the range of all rowIds present for this owid. if (rowId < rowIds[key.fromIndex] || rowId > rowIds[key.toIndex - 1]) { return false; @@ -1181,16 +1181,16 @@ public class VectorizedOrcAcidRowBatchReader @Override public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) throws IOException { - if (rowIds == null || compressedOtids == null) { + if (rowIds == null || compressedOwids == null) { return; } - // Iterate through the batch and for each (otid, rowid) in the batch + // Iterate through the batch and for each (owid, rowid) in the batch // check if it is deleted or not. - long[] originalTransactionVector = + long[] originalWriteIdVector = cols[OrcRecordUpdater.ORIGINAL_WRITEID].isRepeating ? null : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_WRITEID]).vector; - long repeatedOriginalTransaction = (originalTransactionVector != null) ? -1 + long repeatedOriginalWriteId = (originalWriteIdVector != null) ? -1 : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_WRITEID]).vector[0]; long[] bucketProperties = @@ -1205,12 +1205,12 @@ public class VectorizedOrcAcidRowBatchReader for (int setBitIndex = selectedBitSet.nextSetBit(0); setBitIndex >= 0; setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1)) { - long otid = originalTransactionVector != null ? originalTransactionVector[setBitIndex] - : repeatedOriginalTransaction ; + long owid = originalWriteIdVector != null ? originalWriteIdVector[setBitIndex] + : repeatedOriginalWriteId ; int bucketProperty = bucketProperties != null ? (int)bucketProperties[setBitIndex] : repeatedBucketProperty; long rowId = rowIdVector[setBitIndex]; - if (isDeleted(otid, bucketProperty, rowId)) { + if (isDeleted(owid, bucketProperty, rowId)) { selectedBitSet.clear(setBitIndex); } }
http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 68a87e6..c0be51e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1740,17 +1740,17 @@ public class Hive { * Load Data commands for fullAcid tables write to base_x (if there is overwrite clause) or * delta_x_x directory - same as any other Acid write. This method modifies the destPath to add * this path component. - * @param txnId - id of current transaction (in which this operation is running) + * @param writeId - write id of the operated table from current transaction (in which this operation is running) * @param stmtId - see {@link DbTxnManager#getStmtIdAndIncrement()} * @return appropriately modified path */ - private Path fixFullAcidPathForLoadData(LoadFileType loadFileType, Path destPath, long txnId, int stmtId, Table tbl) throws HiveException { + private Path fixFullAcidPathForLoadData(LoadFileType loadFileType, Path destPath, long writeId, int stmtId, Table tbl) throws HiveException { switch (loadFileType) { case REPLACE_ALL: - destPath = new Path(destPath, AcidUtils.baseDir(txnId)); + destPath = new Path(destPath, AcidUtils.baseDir(writeId)); break; case KEEP_EXISTING: - destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId)); + destPath = new Path(destPath, AcidUtils.deltaSubdir(writeId, writeId, stmtId)); break; case OVERWRITE_EXISTING: //should not happen here - this is for replication @@ -1772,9 +1772,9 @@ public class Hive { return conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null; } - private List<Path> listFilesCreatedByQuery(Path loadPath, long txnId, int stmtId) throws HiveException { + private List<Path> listFilesCreatedByQuery(Path loadPath, long writeId, int stmtId) throws HiveException { List<Path> newFiles = new ArrayList<Path>(); - final String filePrefix = AcidUtils.deltaSubdir(txnId, txnId, stmtId); + final String filePrefix = AcidUtils.deltaSubdir(writeId, writeId, stmtId); FileStatus[] srcs; FileSystem srcFs; try { @@ -1940,7 +1940,7 @@ private void constructOneLBLocationMap(FileStatus fSta, * @throws HiveException */ private Set<Path> getValidPartitionsInPath( - int numDP, int numLB, Path loadPath, Long txnId, int stmtId, + int numDP, int numLB, Path loadPath, Long writeId, int stmtId, boolean isMmTable, boolean isInsertOverwrite) throws HiveException { Set<Path> validPartitions = new HashSet<Path>(); try { @@ -1964,7 +1964,7 @@ private void constructOneLBLocationMap(FileStatus fSta, Utilities.FILE_OP_LOGGER.trace( "Looking for dynamic partitions in {} ({} levels)", loadPath, numDP); Path[] leafStatus = Utilities.getMmDirectoryCandidates( - fs, loadPath, numDP, numLB, null, txnId, -1, conf, isInsertOverwrite); + fs, loadPath, numDP, numLB, null, writeId, -1, conf, isInsertOverwrite); for (Path p : leafStatus) { Path dpPath = p.getParent(); // Skip the MM directory that we have found. for (int i = 0; i < numLB; ++i) { http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index c3c029e..7a74a60 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -126,7 +126,7 @@ public abstract class BaseSemanticAnalyzer { /** * A set of FileSinkOperators being written to in an ACID compliant way. We need to remember - * them here because when we build them we don't yet know the transaction id. We need to go + * them here because when we build them we don't yet know the write id. We need to go * back and set it once we actually start running the query. * This also contains insert-only sinks. */ http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 31da66a..4870a90 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -281,7 +281,7 @@ public class CompactorMR { String minOpenInfo = "."; if(writeIds.getMinOpenWriteId() != null) { minOpenInfo = " with min Open " + JavaUtils.writeIdToString(writeIds.getMinOpenWriteId()) + - ". Compaction cannot compact above this txnid"; + ". Compaction cannot compact above this writeId"; } LOG.error("No delta files or original files found to compact in " + sd.getLocation() + " for compactionId=" + ci.id + minOpenInfo); http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index b90f5b1..12d57c6 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -92,11 +92,11 @@ public class TestTxnCommands extends TxnCommandsBaseForTests { /** * Useful for debugging. Dumps ORC file in JSON to CWD. */ - private void dumpBucketData(Table table, long txnId, int stmtId, int bucketNum) throws Exception { + private void dumpBucketData(Table table, long writeId, int stmtId, int bucketNum) throws Exception { if(true) { return; } - Path bucket = AcidUtils.createBucketFile(new Path(new Path(getWarehouseDir(), table.toString().toLowerCase()), AcidUtils.deltaSubdir(txnId, txnId, stmtId)), bucketNum); + Path bucket = AcidUtils.createBucketFile(new Path(new Path(getWarehouseDir(), table.toString().toLowerCase()), AcidUtils.deltaSubdir(writeId, writeId, stmtId)), bucketNum); FileOutputStream delta = new FileOutputStream(testName.getMethodName() + "_" + bucket.getParent().getName() + "_" + bucket.getName()); // try { // FileDump.printJsonData(conf, bucket.toString(), delta); @@ -109,9 +109,9 @@ public class TestTxnCommands extends TxnCommandsBaseForTests { /** * Dump all data in the table by bucket in JSON format */ - private void dumpTableData(Table table, long txnId, int stmtId) throws Exception { + private void dumpTableData(Table table, long writeId, int stmtId) throws Exception { for(int bucketNum = 0; bucketNum < BUCKET_COUNT; bucketNum++) { - dumpBucketData(table, txnId, stmtId, bucketNum); + dumpBucketData(table, writeId, stmtId, bucketNum); } } @Test @@ -765,16 +765,16 @@ public class TestTxnCommands extends TxnCommandsBaseForTests { BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(1))); Assert.assertEquals("", 4, rs.size()); Assert.assertTrue(rs.get(0), - rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12")); + rs.get(0).startsWith("{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12")); Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/000000_0_copy_1")); Assert.assertTrue(rs.get(1), - rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2")); + rs.get(1).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2")); Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/000001_0")); Assert.assertTrue(rs.get(2), - rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5")); + rs.get(2).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5")); Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/000001_0_copy_1")); Assert.assertTrue(rs.get(3), - rs.get(3).startsWith("{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t17")); + rs.get(3).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t17")); Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/delta_0000001_0000001_0000/bucket_00001")); //run Compaction runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'"); @@ -786,16 +786,16 @@ public class TestTxnCommands extends TxnCommandsBaseForTests { } Assert.assertEquals("", 4, rs.size()); Assert.assertTrue(rs.get(0), - rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12")); + rs.get(0).startsWith("{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12")); Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000001/bucket_00000")); Assert.assertTrue(rs.get(1), - rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2")); + rs.get(1).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2")); Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000001/bucket_00001")); Assert.assertTrue(rs.get(2), - rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5")); + rs.get(2).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5")); Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000001/bucket_00001")); Assert.assertTrue(rs.get(3), - rs.get(3).startsWith("{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t17")); + rs.get(3).startsWith("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t17")); Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000001/bucket_00001")); //make sure they are the same before and after compaction http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 2eead9e..b832f71 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -366,15 +366,15 @@ public class TestTxnCommands2 { * Note: order of rows in a file ends up being the reverse of order in values clause (why?!) */ String[][] expected = { - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t13", "bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t15", "bucket_00000"}, - {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\t0\t17", "bucket_00000"}, - {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t120", "bucket_00000"}, - {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t2", "bucket_00001"}, - {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":3}\t1\t4", "bucket_00001"}, - {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "bucket_00001"}, - {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":4}\t1\t6", "bucket_00001"}, - {"{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t16", "bucket_00001"} + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t13", "bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t15", "bucket_00000"}, + {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t0\t17", "bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t120", "bucket_00000"}, + {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t2", "bucket_00001"}, + {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":3}\t1\t4", "bucket_00001"}, + {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "bucket_00001"}, + {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":4}\t1\t6", "bucket_00001"}, + {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t16", "bucket_00001"} }; Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size()); for(int i = 0; i < expected.length; i++) { http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java index 0a305a4..8a01de3 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java @@ -105,13 +105,13 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; String[][] expected = new String[][]{ - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/000000_0"}, - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"}}; + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"}}; checkResult(expected, testQuery, isVectorized, "load data inpath"); runStatementOnDriver("update T set b = 17 where a = 1"); String[][] expected2 = new String[][]{ - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"}, - {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000002_0000002_0000/bucket_00000"} + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000002_0000002_0000/bucket_00000"} }; checkResult(expected2, testQuery, isVectorized, "update"); @@ -121,15 +121,15 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { runStatementOnDriver("alter table T compact 'minor'"); TestTxnCommands2.runWorker(hiveConf); String[][] expected3 = new String[][] { - {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000001_0000004/bucket_00000"}, - {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000001_0000004/bucket_00000"} + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000001_0000004/bucket_00000"}, + {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000001_0000004/bucket_00000"} }; checkResult(expected3, testQuery, isVectorized, "delete compact minor"); runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' overwrite into table T"); String[][] expected4 = new String[][]{ - {"{\"transactionid\":5,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000005/000000_0"}, - {"{\"transactionid\":5,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000005/000000_0"}}; + {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000005/000000_0"}, + {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000005/000000_0"}}; checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite"); //load same data again (additive) @@ -138,9 +138,9 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { runStatementOnDriver("delete from T where a = 3");//matches 2 rows runStatementOnDriver("insert into T values(2,2)"); String[][] expected5 = new String[][]{ - {"{\"transactionid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000007_0000007_0000/bucket_00000"}, - {"{\"transactionid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/delta_0000007_0000007_0000/bucket_00000"}, - {"{\"transactionid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000009_0000009_0000/bucket_00000"} + {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000007_0000007_0000/bucket_00000"}, + {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/delta_0000007_0000007_0000/bucket_00000"}, + {"{\"writeid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000009_0000009_0000/bucket_00000"} }; checkResult(expected5, testQuery, isVectorized, "load data inpath overwrite update"); @@ -148,9 +148,9 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { runStatementOnDriver("alter table T compact 'major'"); TestTxnCommands2.runWorker(hiveConf); String[][] expected6 = new String[][]{ - {"{\"transactionid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/base_0000009/bucket_00000"}, - {"{\"transactionid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/base_0000009/bucket_00000"}, - {"{\"transactionid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000009/bucket_00000"} + {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/base_0000009/bucket_00000"}, + {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/base_0000009/bucket_00000"}, + {"{\"writeid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000009/bucket_00000"} }; checkResult(expected6, testQuery, isVectorized, "load data inpath compact major"); } @@ -174,21 +174,21 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; String[][] expected = new String[][] { //normal insert - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000001_0000001_0000/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000001_0000001_0000/bucket_00000"}, //Load Data - {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000002_0000002_0000/000000_0"}, - {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000002_0000002_0000/000000_0"}}; + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000002_0000002_0000/000000_0"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000002_0000002_0000/000000_0"}}; checkResult(expected, testQuery, isVectorized, "load data inpath"); //test minor compaction runStatementOnDriver("alter table T compact 'minor'"); TestTxnCommands2.runWorker(hiveConf); String[][] expected1 = new String[][] { - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000001_0000002/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000001_0000002/bucket_00000"}, - {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000002/bucket_00000"}, - {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000002/bucket_00000"} + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000001_0000002/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000001_0000002/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000002/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000002/bucket_00000"} }; checkResult(expected1, testQuery, isVectorized, "load data inpath (minor)"); @@ -197,11 +197,11 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { runStatementOnDriver("alter table T compact 'major'"); TestTxnCommands2.runWorker(hiveConf); String[][] expected2 = new String[][] { - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/base_0000003/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/base_0000003/bucket_00000"}, - {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000003/bucket_00000"}, - {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000003/bucket_00000"}, - {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000003/bucket_00000"} + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/base_0000003/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/base_0000003/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000003/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000003/bucket_00000"}, + {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000003/bucket_00000"} }; checkResult(expected2, testQuery, isVectorized, "load data inpath (major)"); @@ -210,8 +210,8 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/2'"); runStatementOnDriver("load data inpath '" + getWarehouseDir() + "/2/data' overwrite into table T"); String[][] expected3 = new String[][] { - {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000004/000000_0"}, - {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000004/000000_0"}}; + {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000004/000000_0"}, + {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000004/000000_0"}}; checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite"); //one more major compaction @@ -219,9 +219,9 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { runStatementOnDriver("alter table T compact 'major'"); TestTxnCommands2.runWorker(hiveConf); String[][] expected4 = new String[][] { - {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000005/bucket_00000"}, - {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000005/bucket_00000"}, - {"{\"transactionid\":5,\"bucketid\":536870912,\"rowid\":0}\t6\t6", "t/base_0000005/bucket_00000"}}; + {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000005/bucket_00000"}, + {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000005/bucket_00000"}, + {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t6\t6", "t/base_0000005/bucket_00000"}}; checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite (major)"); } /** @@ -252,22 +252,22 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; /* -{"transactionid":0,"bucketid":536870912,"rowid":0} 0 2/000000_0 -{"transactionid":0,"bucketid":536870912,"rowid":1} 0 4/000000_0 -{"transactionid":1,"bucketid":536870912,"rowid":0} 4 4/delta_0000001_0000001_0000/000000_0 -{"transactionid":1,"bucketid":536870912,"rowid":1} 5 5/delta_0000001_0000001_0000/000000_0 +{"writeid":0,"bucketid":536870912,"rowid":0} 0 2/000000_0 +{"writeid":0,"bucketid":536870912,"rowid":1} 0 4/000000_0 +{"writeid":1,"bucketid":536870912,"rowid":0} 4 4/delta_0000001_0000001_0000/000000_0 +{"writeid":1,"bucketid":536870912,"rowid":1} 5 5/delta_0000001_0000001_0000/000000_0 */ String[][] expected = new String[][] { //from pre-acid insert - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/000000_0"}, //from Load Data into acid converted table - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/000000_0"}, - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"}, - {"{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t2", "t/delta_0000001_0000001_0000/000001_0"}, - {"{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3", "t/delta_0000001_0000001_0000/000001_0"}, - {"{\"transactionid\":1,\"bucketid\":537001984,\"rowid\":0}\t4\t4", "t/delta_0000001_0000001_0000/000002_0"}, - {"{\"transactionid\":1,\"bucketid\":537001984,\"rowid\":1}\t5\t5", "t/delta_0000001_0000001_0000/000002_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t2", "t/delta_0000001_0000001_0000/000001_0"}, + {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3", "t/delta_0000001_0000001_0000/000001_0"}, + {"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t4\t4", "t/delta_0000001_0000001_0000/000002_0"}, + {"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t5\t5", "t/delta_0000001_0000001_0000/000002_0"}, }; checkResult(expected, testQuery, isVectorized, "load data inpath"); @@ -278,9 +278,9 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/2/data' overwrite into table T"); String[][] expected2 = new String[][] { - {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000002/000000_0"}, - {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000002/000000_0"}, - {"{\"transactionid\":2,\"bucketid\":536936448,\"rowid\":0}\t8\t8", "t/base_0000002/000001_0"} + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000002/000000_0"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000002/000000_0"}, + {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t8\t8", "t/base_0000002/000001_0"} }; checkResult(expected2, testQuery, isVectorized, "load data inpath overwrite"); @@ -290,10 +290,10 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { TestTxnCommands2.runWorker(hiveConf); String[][] expected3 = new String[][] { - {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000003/bucket_00000"}, - {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000003/bucket_00000"}, - {"{\"transactionid\":2,\"bucketid\":536936448,\"rowid\":0}\t8\t8", "t/base_0000003/bucket_00001"}, - {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\t9\t9", "t/base_0000003/bucket_00000"} + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000003/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000003/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t8\t8", "t/base_0000003/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t9\t9", "t/base_0000003/bucket_00000"} }; checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite (major)"); } @@ -324,12 +324,12 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { List<String> rs = runStatementOnDriver("select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID"); String[][] expected = new String[][] { - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000001_0000001_0000/000000_0"}, - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000001_0000001_0000/000000_0"}, - {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t2", "t/p=1/delta_0000002_0000002_0000/000000_0"}, - {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4", "t/p=1/delta_0000002_0000002_0000/000000_0"}, - {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t2", "t/p=1/delta_0000003_0000003_0000/000000_0"}, - {"{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4", "t/p=1/delta_0000003_0000003_0000/000000_0"}}; + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t2", "t/p=1/delta_0000002_0000002_0000/000000_0"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4", "t/p=1/delta_0000002_0000002_0000/000000_0"}, + {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t2", "t/p=1/delta_0000003_0000003_0000/000000_0"}, + {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4", "t/p=1/delta_0000003_0000003_0000/000000_0"}}; checkExpected(rs, expected, "load data inpath partitioned"); @@ -338,10 +338,10 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { runStatementOnDriver("truncate table Tstage"); runStatementOnDriver("load data inpath '" + getWarehouseDir() + "/4/data' overwrite into table T partition(p=1)"); String[][] expected2 = new String[][] { - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000001_0000001_0000/000000_0"}, - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000001_0000001_0000/000000_0"}, - {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":0}\t1\t5\t2", "t/p=1/base_0000004/000000_0"}, - {"{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":1}\t1\t5\t4", "t/p=1/base_0000004/000000_0"}}; + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000001_0000001_0000/000000_0"}, + {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t1\t5\t2", "t/p=1/base_0000004/000000_0"}, + {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t1\t5\t4", "t/p=1/base_0000004/000000_0"}}; rs = runStatementOnDriver("select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID"); checkExpected(rs, expected2, "load data inpath partitioned overwrite"); } @@ -403,20 +403,20 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; String[][] expected = new String[][] { - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/delta_0000001_0000001_0001/000000_0"}, - {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/delta_0000001_0000001_0001/000000_0"} + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/delta_0000001_0000001_0001/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/delta_0000001_0000001_0001/000000_0"} }; checkResult(expected, testQuery, isVectorized, "load data inpath"); runStatementOnDriver("alter table T compact 'major'"); TestTxnCommands2.runWorker(hiveConf); String[][] expected2 = new String[][] { - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000001/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000001/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/base_0000001/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/base_0000001/bucket_00000"} + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000001/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000001/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/base_0000001/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/base_0000001/bucket_00000"} }; checkResult(expected2, testQuery, isVectorized, "load data inpath (major)"); //at lest for now, Load Data w/Overwrite is not allowed in a txn: HIVE-18154 @@ -442,8 +442,8 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests { String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; String[][] expected = new String[][] { - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000"} + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000"} }; checkResult(expected, testQuery, isVectorized, "load data inpath"); }
