http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockComponent.java ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockComponent.java b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockComponent.java index adb0c44..26d1b76 100644 --- a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockComponent.java +++ b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockComponent.java @@ -43,6 +43,8 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc private static final org.apache.thrift.protocol.TField DBNAME_FIELD_DESC = new org.apache.thrift.protocol.TField("dbname", org.apache.thrift.protocol.TType.STRING, (short)3); private static final org.apache.thrift.protocol.TField TABLENAME_FIELD_DESC = new org.apache.thrift.protocol.TField("tablename", org.apache.thrift.protocol.TType.STRING, (short)4); private static final org.apache.thrift.protocol.TField PARTITIONNAME_FIELD_DESC = new org.apache.thrift.protocol.TField("partitionname", org.apache.thrift.protocol.TType.STRING, (short)5); + private static final org.apache.thrift.protocol.TField OPERATION_TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("operationType", org.apache.thrift.protocol.TType.I32, (short)6); + private static final org.apache.thrift.protocol.TField IS_ACID_FIELD_DESC = new org.apache.thrift.protocol.TField("isAcid", org.apache.thrift.protocol.TType.BOOL, (short)7); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -55,6 +57,8 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc private String dbname; // required private String tablename; // optional private String partitionname; // optional + private DataOperationType operationType; // optional + private boolean isAcid; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -70,7 +74,13 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc LEVEL((short)2, "level"), DBNAME((short)3, "dbname"), TABLENAME((short)4, "tablename"), - PARTITIONNAME((short)5, "partitionname"); + PARTITIONNAME((short)5, "partitionname"), + /** + * + * @see DataOperationType + */ + OPERATION_TYPE((short)6, "operationType"), + IS_ACID((short)7, "isAcid"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -95,6 +105,10 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc return TABLENAME; case 5: // PARTITIONNAME return PARTITIONNAME; + case 6: // OPERATION_TYPE + return OPERATION_TYPE; + case 7: // IS_ACID + return IS_ACID; default: return null; } @@ -135,7 +149,9 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc } // isset id assignments - private static final _Fields optionals[] = {_Fields.TABLENAME,_Fields.PARTITIONNAME}; + private static final int __ISACID_ISSET_ID = 0; + private byte __isset_bitfield = 0; + private static final _Fields optionals[] = {_Fields.TABLENAME,_Fields.PARTITIONNAME,_Fields.OPERATION_TYPE,_Fields.IS_ACID}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -149,11 +165,19 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.PARTITIONNAME, new org.apache.thrift.meta_data.FieldMetaData("partitionname", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.OPERATION_TYPE, new org.apache.thrift.meta_data.FieldMetaData("operationType", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, DataOperationType.class))); + tmpMap.put(_Fields.IS_ACID, new org.apache.thrift.meta_data.FieldMetaData("isAcid", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(LockComponent.class, metaDataMap); } public LockComponent() { + this.operationType = org.apache.hadoop.hive.metastore.api.DataOperationType.UNSET; + + this.isAcid = false; + } public LockComponent( @@ -171,6 +195,7 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc * Performs a deep copy on <i>other</i>. */ public LockComponent(LockComponent other) { + __isset_bitfield = other.__isset_bitfield; if (other.isSetType()) { this.type = other.type; } @@ -186,6 +211,10 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc if (other.isSetPartitionname()) { this.partitionname = other.partitionname; } + if (other.isSetOperationType()) { + this.operationType = other.operationType; + } + this.isAcid = other.isAcid; } public LockComponent deepCopy() { @@ -199,6 +228,10 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc this.dbname = null; this.tablename = null; this.partitionname = null; + this.operationType = org.apache.hadoop.hive.metastore.api.DataOperationType.UNSET; + + this.isAcid = false; + } /** @@ -332,6 +365,59 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc } } + /** + * + * @see DataOperationType + */ + public DataOperationType getOperationType() { + return this.operationType; + } + + /** + * + * @see DataOperationType + */ + public void setOperationType(DataOperationType operationType) { + this.operationType = operationType; + } + + public void unsetOperationType() { + this.operationType = null; + } + + /** Returns true if field operationType is set (has been assigned a value) and false otherwise */ + public boolean isSetOperationType() { + return this.operationType != null; + } + + public void setOperationTypeIsSet(boolean value) { + if (!value) { + this.operationType = null; + } + } + + public boolean isIsAcid() { + return this.isAcid; + } + + public void setIsAcid(boolean isAcid) { + this.isAcid = isAcid; + setIsAcidIsSet(true); + } + + public void unsetIsAcid() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __ISACID_ISSET_ID); + } + + /** Returns true if field isAcid is set (has been assigned a value) and false otherwise */ + public boolean isSetIsAcid() { + return EncodingUtils.testBit(__isset_bitfield, __ISACID_ISSET_ID); + } + + public void setIsAcidIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __ISACID_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case TYPE: @@ -374,6 +460,22 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc } break; + case OPERATION_TYPE: + if (value == null) { + unsetOperationType(); + } else { + setOperationType((DataOperationType)value); + } + break; + + case IS_ACID: + if (value == null) { + unsetIsAcid(); + } else { + setIsAcid((Boolean)value); + } + break; + } } @@ -394,6 +496,12 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc case PARTITIONNAME: return getPartitionname(); + case OPERATION_TYPE: + return getOperationType(); + + case IS_ACID: + return isIsAcid(); + } throw new IllegalStateException(); } @@ -415,6 +523,10 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc return isSetTablename(); case PARTITIONNAME: return isSetPartitionname(); + case OPERATION_TYPE: + return isSetOperationType(); + case IS_ACID: + return isSetIsAcid(); } throw new IllegalStateException(); } @@ -477,6 +589,24 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc return false; } + boolean this_present_operationType = true && this.isSetOperationType(); + boolean that_present_operationType = true && that.isSetOperationType(); + if (this_present_operationType || that_present_operationType) { + if (!(this_present_operationType && that_present_operationType)) + return false; + if (!this.operationType.equals(that.operationType)) + return false; + } + + boolean this_present_isAcid = true && this.isSetIsAcid(); + boolean that_present_isAcid = true && that.isSetIsAcid(); + if (this_present_isAcid || that_present_isAcid) { + if (!(this_present_isAcid && that_present_isAcid)) + return false; + if (this.isAcid != that.isAcid) + return false; + } + return true; } @@ -509,6 +639,16 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc if (present_partitionname) list.add(partitionname); + boolean present_operationType = true && (isSetOperationType()); + list.add(present_operationType); + if (present_operationType) + list.add(operationType.getValue()); + + boolean present_isAcid = true && (isSetIsAcid()); + list.add(present_isAcid); + if (present_isAcid) + list.add(isAcid); + return list.hashCode(); } @@ -570,6 +710,26 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc return lastComparison; } } + lastComparison = Boolean.valueOf(isSetOperationType()).compareTo(other.isSetOperationType()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetOperationType()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.operationType, other.operationType); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetIsAcid()).compareTo(other.isSetIsAcid()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetIsAcid()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.isAcid, other.isAcid); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -633,6 +793,22 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc } first = false; } + if (isSetOperationType()) { + if (!first) sb.append(", "); + sb.append("operationType:"); + if (this.operationType == null) { + sb.append("null"); + } else { + sb.append(this.operationType); + } + first = false; + } + if (isSetIsAcid()) { + if (!first) sb.append(", "); + sb.append("isAcid:"); + sb.append(this.isAcid); + first = false; + } sb.append(")"); return sb.toString(); } @@ -664,6 +840,8 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bitfield = 0; read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); @@ -728,6 +906,22 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 6: // OPERATION_TYPE + if (schemeField.type == org.apache.thrift.protocol.TType.I32) { + struct.operationType = org.apache.hadoop.hive.metastore.api.DataOperationType.findByValue(iprot.readI32()); + struct.setOperationTypeIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 7: // IS_ACID + if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) { + struct.isAcid = iprot.readBool(); + struct.setIsAcidIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -770,6 +964,18 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc oprot.writeFieldEnd(); } } + if (struct.operationType != null) { + if (struct.isSetOperationType()) { + oprot.writeFieldBegin(OPERATION_TYPE_FIELD_DESC); + oprot.writeI32(struct.operationType.getValue()); + oprot.writeFieldEnd(); + } + } + if (struct.isSetIsAcid()) { + oprot.writeFieldBegin(IS_ACID_FIELD_DESC); + oprot.writeBool(struct.isAcid); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -797,13 +1003,25 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc if (struct.isSetPartitionname()) { optionals.set(1); } - oprot.writeBitSet(optionals, 2); + if (struct.isSetOperationType()) { + optionals.set(2); + } + if (struct.isSetIsAcid()) { + optionals.set(3); + } + oprot.writeBitSet(optionals, 4); if (struct.isSetTablename()) { oprot.writeString(struct.tablename); } if (struct.isSetPartitionname()) { oprot.writeString(struct.partitionname); } + if (struct.isSetOperationType()) { + oprot.writeI32(struct.operationType.getValue()); + } + if (struct.isSetIsAcid()) { + oprot.writeBool(struct.isAcid); + } } @Override @@ -815,7 +1033,7 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc struct.setLevelIsSet(true); struct.dbname = iprot.readString(); struct.setDbnameIsSet(true); - BitSet incoming = iprot.readBitSet(2); + BitSet incoming = iprot.readBitSet(4); if (incoming.get(0)) { struct.tablename = iprot.readString(); struct.setTablenameIsSet(true); @@ -824,6 +1042,14 @@ public class LockComponent implements org.apache.thrift.TBase<LockComponent, Loc struct.partitionname = iprot.readString(); struct.setPartitionnameIsSet(true); } + if (incoming.get(2)) { + struct.operationType = org.apache.hadoop.hive.metastore.api.DataOperationType.findByValue(iprot.readI32()); + struct.setOperationTypeIsSet(true); + } + if (incoming.get(3)) { + struct.isAcid = iprot.readBool(); + struct.setIsAcidIsSet(true); + } } }
http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/metastore/src/gen/thrift/gen-php/metastore/Types.php ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-php/metastore/Types.php b/metastore/src/gen/thrift/gen-php/metastore/Types.php index fe25366..045864a 100644 --- a/metastore/src/gen/thrift/gen-php/metastore/Types.php +++ b/metastore/src/gen/thrift/gen-php/metastore/Types.php @@ -114,6 +114,23 @@ final class GrantRevokeType { ); } +final class DataOperationType { + const SELECT = 1; + const INSERT = 2; + const UPDATE = 3; + const DELETE = 4; + const UNSET = 5; + const NO_TXN = 6; + static public $__names = array( + 1 => 'SELECT', + 2 => 'INSERT', + 3 => 'UPDATE', + 4 => 'DELETE', + 5 => 'UNSET', + 6 => 'NO_TXN', + ); +} + final class EventRequestType { const INSERT = 1; const UPDATE = 2; @@ -11049,6 +11066,14 @@ class LockComponent { * @var string */ public $partitionname = null; + /** + * @var int + */ + public $operationType = 5; + /** + * @var bool + */ + public $isAcid = false; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -11073,6 +11098,14 @@ class LockComponent { 'var' => 'partitionname', 'type' => TType::STRING, ), + 6 => array( + 'var' => 'operationType', + 'type' => TType::I32, + ), + 7 => array( + 'var' => 'isAcid', + 'type' => TType::BOOL, + ), ); } if (is_array($vals)) { @@ -11091,6 +11124,12 @@ class LockComponent { if (isset($vals['partitionname'])) { $this->partitionname = $vals['partitionname']; } + if (isset($vals['operationType'])) { + $this->operationType = $vals['operationType']; + } + if (isset($vals['isAcid'])) { + $this->isAcid = $vals['isAcid']; + } } } @@ -11148,6 +11187,20 @@ class LockComponent { $xfer += $input->skip($ftype); } break; + case 6: + if ($ftype == TType::I32) { + $xfer += $input->readI32($this->operationType); + } else { + $xfer += $input->skip($ftype); + } + break; + case 7: + if ($ftype == TType::BOOL) { + $xfer += $input->readBool($this->isAcid); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -11186,6 +11239,16 @@ class LockComponent { $xfer += $output->writeString($this->partitionname); $xfer += $output->writeFieldEnd(); } + if ($this->operationType !== null) { + $xfer += $output->writeFieldBegin('operationType', TType::I32, 6); + $xfer += $output->writeI32($this->operationType); + $xfer += $output->writeFieldEnd(); + } + if ($this->isAcid !== null) { + $xfer += $output->writeFieldBegin('isAcid', TType::BOOL, 7); + $xfer += $output->writeBool($this->isAcid); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; @@ -13378,6 +13441,10 @@ class AddDynamicPartitions { * @var string[] */ public $partitionnames = null; + /** + * @var int + */ + public $operationType = 5; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -13402,6 +13469,10 @@ class AddDynamicPartitions { 'type' => TType::STRING, ), ), + 5 => array( + 'var' => 'operationType', + 'type' => TType::I32, + ), ); } if (is_array($vals)) { @@ -13417,6 +13488,9 @@ class AddDynamicPartitions { if (isset($vals['partitionnames'])) { $this->partitionnames = $vals['partitionnames']; } + if (isset($vals['operationType'])) { + $this->operationType = $vals['operationType']; + } } } @@ -13477,6 +13551,13 @@ class AddDynamicPartitions { $xfer += $input->skip($ftype); } break; + case 5: + if ($ftype == TType::I32) { + $xfer += $input->readI32($this->operationType); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -13522,6 +13603,11 @@ class AddDynamicPartitions { } $xfer += $output->writeFieldEnd(); } + if ($this->operationType !== null) { + $xfer += $output->writeFieldBegin('operationType', TType::I32, 5); + $xfer += $output->writeI32($this->operationType); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py index c59fa3e..29ba9b1 100644 --- a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -168,6 +168,32 @@ class GrantRevokeType: "REVOKE": 2, } +class DataOperationType: + SELECT = 1 + INSERT = 2 + UPDATE = 3 + DELETE = 4 + UNSET = 5 + NO_TXN = 6 + + _VALUES_TO_NAMES = { + 1: "SELECT", + 2: "INSERT", + 3: "UPDATE", + 4: "DELETE", + 5: "UNSET", + 6: "NO_TXN", + } + + _NAMES_TO_VALUES = { + "SELECT": 1, + "INSERT": 2, + "UPDATE": 3, + "DELETE": 4, + "UNSET": 5, + "NO_TXN": 6, + } + class EventRequestType: INSERT = 1 UPDATE = 2 @@ -7633,6 +7659,8 @@ class LockComponent: - dbname - tablename - partitionname + - operationType + - isAcid """ thrift_spec = ( @@ -7642,14 +7670,18 @@ class LockComponent: (3, TType.STRING, 'dbname', None, None, ), # 3 (4, TType.STRING, 'tablename', None, None, ), # 4 (5, TType.STRING, 'partitionname', None, None, ), # 5 + (6, TType.I32, 'operationType', None, 5, ), # 6 + (7, TType.BOOL, 'isAcid', None, False, ), # 7 ) - def __init__(self, type=None, level=None, dbname=None, tablename=None, partitionname=None,): + def __init__(self, type=None, level=None, dbname=None, tablename=None, partitionname=None, operationType=thrift_spec[6][4], isAcid=thrift_spec[7][4],): self.type = type self.level = level self.dbname = dbname self.tablename = tablename self.partitionname = partitionname + self.operationType = operationType + self.isAcid = isAcid def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -7685,6 +7717,16 @@ class LockComponent: self.partitionname = iprot.readString() else: iprot.skip(ftype) + elif fid == 6: + if ftype == TType.I32: + self.operationType = iprot.readI32() + else: + iprot.skip(ftype) + elif fid == 7: + if ftype == TType.BOOL: + self.isAcid = iprot.readBool() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -7715,6 +7757,14 @@ class LockComponent: oprot.writeFieldBegin('partitionname', TType.STRING, 5) oprot.writeString(self.partitionname) oprot.writeFieldEnd() + if self.operationType is not None: + oprot.writeFieldBegin('operationType', TType.I32, 6) + oprot.writeI32(self.operationType) + oprot.writeFieldEnd() + if self.isAcid is not None: + oprot.writeFieldBegin('isAcid', TType.BOOL, 7) + oprot.writeBool(self.isAcid) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -7735,6 +7785,8 @@ class LockComponent: value = (value * 31) ^ hash(self.dbname) value = (value * 31) ^ hash(self.tablename) value = (value * 31) ^ hash(self.partitionname) + value = (value * 31) ^ hash(self.operationType) + value = (value * 31) ^ hash(self.isAcid) return value def __repr__(self): @@ -9300,6 +9352,7 @@ class AddDynamicPartitions: - dbname - tablename - partitionnames + - operationType """ thrift_spec = ( @@ -9308,13 +9361,15 @@ class AddDynamicPartitions: (2, TType.STRING, 'dbname', None, None, ), # 2 (3, TType.STRING, 'tablename', None, None, ), # 3 (4, TType.LIST, 'partitionnames', (TType.STRING,None), None, ), # 4 + (5, TType.I32, 'operationType', None, 5, ), # 5 ) - def __init__(self, txnid=None, dbname=None, tablename=None, partitionnames=None,): + def __init__(self, txnid=None, dbname=None, tablename=None, partitionnames=None, operationType=thrift_spec[5][4],): self.txnid = txnid self.dbname = dbname self.tablename = tablename self.partitionnames = partitionnames + self.operationType = operationType def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -9350,6 +9405,11 @@ class AddDynamicPartitions: iprot.readListEnd() else: iprot.skip(ftype) + elif fid == 5: + if ftype == TType.I32: + self.operationType = iprot.readI32() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -9379,6 +9439,10 @@ class AddDynamicPartitions: oprot.writeString(iter447) oprot.writeListEnd() oprot.writeFieldEnd() + if self.operationType is not None: + oprot.writeFieldBegin('operationType', TType.I32, 5) + oprot.writeI32(self.operationType) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -9400,6 +9464,7 @@ class AddDynamicPartitions: value = (value * 31) ^ hash(self.dbname) value = (value * 31) ^ hash(self.tablename) value = (value * 31) ^ hash(self.partitionnames) + value = (value * 31) ^ hash(self.operationType) return value def __repr__(self): http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb index 2874308..662658c 100644 --- a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -79,6 +79,17 @@ module GrantRevokeType VALID_VALUES = Set.new([GRANT, REVOKE]).freeze end +module DataOperationType + SELECT = 1 + INSERT = 2 + UPDATE = 3 + DELETE = 4 + UNSET = 5 + NO_TXN = 6 + VALUE_MAP = {1 => "SELECT", 2 => "INSERT", 3 => "UPDATE", 4 => "DELETE", 5 => "UNSET", 6 => "NO_TXN"} + VALID_VALUES = Set.new([SELECT, INSERT, UPDATE, DELETE, UNSET, NO_TXN]).freeze +end + module EventRequestType INSERT = 1 UPDATE = 2 @@ -1721,13 +1732,17 @@ class LockComponent DBNAME = 3 TABLENAME = 4 PARTITIONNAME = 5 + OPERATIONTYPE = 6 + ISACID = 7 FIELDS = { TYPE => {:type => ::Thrift::Types::I32, :name => 'type', :enum_class => ::LockType}, LEVEL => {:type => ::Thrift::Types::I32, :name => 'level', :enum_class => ::LockLevel}, DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbname'}, TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tablename', :optional => true}, - PARTITIONNAME => {:type => ::Thrift::Types::STRING, :name => 'partitionname', :optional => true} + PARTITIONNAME => {:type => ::Thrift::Types::STRING, :name => 'partitionname', :optional => true}, + OPERATIONTYPE => {:type => ::Thrift::Types::I32, :name => 'operationType', :default => 5, :optional => true, :enum_class => ::DataOperationType}, + ISACID => {:type => ::Thrift::Types::BOOL, :name => 'isAcid', :default => false, :optional => true} } def struct_fields; FIELDS; end @@ -1742,6 +1757,9 @@ class LockComponent unless @level.nil? || ::LockLevel::VALID_VALUES.include?(@level) raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Invalid value of field level!') end + unless @operationType.nil? || ::DataOperationType::VALID_VALUES.include?(@operationType) + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Invalid value of field operationType!') + end end ::Thrift::Struct.generate_accessors self @@ -2103,12 +2121,14 @@ class AddDynamicPartitions DBNAME = 2 TABLENAME = 3 PARTITIONNAMES = 4 + OPERATIONTYPE = 5 FIELDS = { TXNID => {:type => ::Thrift::Types::I64, :name => 'txnid'}, DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbname'}, TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tablename'}, - PARTITIONNAMES => {:type => ::Thrift::Types::LIST, :name => 'partitionnames', :element => {:type => ::Thrift::Types::STRING}} + PARTITIONNAMES => {:type => ::Thrift::Types::LIST, :name => 'partitionnames', :element => {:type => ::Thrift::Types::STRING}}, + OPERATIONTYPE => {:type => ::Thrift::Types::I32, :name => 'operationType', :default => 5, :optional => true, :enum_class => ::DataOperationType} } def struct_fields; FIELDS; end @@ -2118,6 +2138,9 @@ class AddDynamicPartitions raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field dbname is unset!') unless @dbname raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field tablename is unset!') unless @tablename raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field partitionnames is unset!') unless @partitionnames + unless @operationType.nil? || ::DataOperationType::VALID_VALUES.include?(@operationType) + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Invalid value of field operationType!') + end end ::Thrift::Struct.generate_accessors self http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 94d5d86..83d533f 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.DropPartitionsExpr; import org.apache.hadoop.hive.metastore.api.DropPartitionsRequest; @@ -1958,10 +1959,18 @@ public class HiveMetaStoreClient implements IMetaStoreClient { return client.show_compact(new ShowCompactRequest()); } + @Deprecated @Override public void addDynamicPartitions(long txnId, String dbName, String tableName, List<String> partNames) throws TException { client.add_dynamic_partitions(new AddDynamicPartitions(txnId, dbName, tableName, partNames)); + } + @Override + public void addDynamicPartitions(long txnId, String dbName, String tableName, + List<String> partNames, DataOperationType operationType) throws TException { + AddDynamicPartitions adp = new AddDynamicPartitions(txnId, dbName, tableName, partNames); + adp.setOperationType(operationType); + client.add_dynamic_partitions(adp); } @InterfaceAudience.LimitedPrivate({"HCatalog"}) http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index da693f7..fc7b70f 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.FireEventRequest; import org.apache.hadoop.hive.metastore.api.FireEventResponse; @@ -61,7 +62,6 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse; -import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest; import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse; import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest; @@ -1402,6 +1402,12 @@ public interface IMetaStoreClient { ShowCompactResponse showCompactions() throws TException; /** + * @deprecated in Hive 1.3.0/2.1.0 - will be removed in 2 releases + */ + @Deprecated + void addDynamicPartitions(long txnId, String dbName, String tableName, List<String> partNames) + throws TException; + /** * Send a list of partitions to the metastore to indicate which partitions were loaded * dynamically. * @param txnId id of the transaction @@ -1410,7 +1416,8 @@ public interface IMetaStoreClient { * @param partNames partition name, as constructed by Warehouse.makePartName * @throws TException */ - void addDynamicPartitions(long txnId, String dbName, String tableName, List<String> partNames) + void addDynamicPartitions(long txnId, String dbName, String tableName, List<String> partNames, + DataOperationType operationType) throws TException; /** http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java b/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java index acd4653..3e8f193 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.metastore; +import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.LockComponent; import org.apache.hadoop.hive.metastore.api.LockLevel; import org.apache.hadoop.hive.metastore.api.LockType; @@ -70,7 +71,16 @@ public class LockComponentBuilder { component.setDbname(dbName); return this; } + + public LockComponentBuilder setOperationType(DataOperationType dop) { + component.setOperationType(dop); + return this; + } + public LockComponentBuilder setIsAcid(boolean t) { + component.setIsAcid(t); + return this; + } /** * Set the table name. * @param tableName table name http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/metastore/src/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java b/metastore/src/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java index 2fa7e07..6317a96 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java @@ -35,10 +35,19 @@ public class LockRequestBuilder { private LockTrie trie; private boolean userSet; + /** + * @deprecated + */ public LockRequestBuilder() { + this(null); + } + public LockRequestBuilder(String agentInfo) { req = new LockRequest(); trie = new LockTrie(); userSet = false; + if(agentInfo != null) { + req.setAgentInfo(agentInfo); + } } /** http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 0023f08..4da5542 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -131,7 +131,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { static private boolean doRetryOnConnPool = false; private enum OpertaionType { - INSERT('i'), UPDATE('u'), DELETE('d'); + SELECT('s'), INSERT('i'), UPDATE('u'), DELETE('d'); private final char sqlConst; OpertaionType(char sqlConst) { this.sqlConst = sqlConst; @@ -141,6 +141,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } public static OpertaionType fromString(char sqlConst) { switch (sqlConst) { + case 's': + return SELECT; case 'i': return INSERT; case 'u': @@ -151,16 +153,18 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { throw new IllegalArgumentException(quoteChar(sqlConst)); } } - //we should instead just pass in OpertaionType from client (HIVE-13622) - @Deprecated - public static OpertaionType fromLockType(LockType lockType) { - switch (lockType) { - case SHARED_READ: - return INSERT; - case SHARED_WRITE: - return UPDATE; + public static OpertaionType fromDataOperationType(DataOperationType dop) { + switch (dop) { + case SELECT: + return OpertaionType.SELECT; + case INSERT: + return OpertaionType.INSERT; + case UPDATE: + return OpertaionType.UPDATE; + case DELETE: + return OpertaionType.DELETE; default: - throw new IllegalArgumentException("Unexpected lock type: " + lockType); + throw new IllegalArgumentException("Unexpected value: " + dop); } } } @@ -674,20 +678,21 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { String s = "insert into COMPLETED_TXN_COMPONENTS select tc_txnid, tc_database, tc_table, " + "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid; LOG.debug("Going to execute insert <" + s + ">"); - if (stmt.executeUpdate(s) < 1) { + int modCount = 0; + if ((modCount = stmt.executeUpdate(s)) < 1) { //this can be reasonable for an empty txn START/COMMIT or read-only txn LOG.info("Expected to move at least one record from txn_components to " + "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid)); } s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid; LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); + modCount = stmt.executeUpdate(s); s = "delete from HIVE_LOCKS where hl_txnid = " + txnid; LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); + modCount = stmt.executeUpdate(s); s = "delete from TXNS where txn_id = " + txnid; LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); + modCount = stmt.executeUpdate(s); LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { @@ -829,7 +834,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { /** Get the next lock id. * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race. * Suppose ID gen is a separate txn and 2 concurrent lock() methods are running. 1st one generates nl_next=7, - * 2nd nl_next=8. Then 8 goes first to insert into HIVE_LOCKS and aquires the locks. Then 7 unblocks, + * 2nd nl_next=8. Then 8 goes first to insert into HIVE_LOCKS and acquires the locks. Then 7 unblocks, * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)} * doesn't block on locks acquired later than one it's checking*/ String s = addForUpdateClause("select nl_next from NEXT_LOCK_ID"); @@ -847,13 +852,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { stmt.executeUpdate(s); if (txnid > 0) { - /**DBTxnManager#acquireLocks() knows if it's I/U/D (that's how it decides what lock to get) - * So if we add that to LockRequest we'll know that here - * Should probably add it to LockComponent so that if in the future we decide wo allow 1 LockRequest - * to contain LockComponent for multiple operations. - * Deriving it from lock info doesn't distinguish between Update and Delete - * - * QueryPlan has BaseSemanticAnalyzer which has acidFileSinks list of FileSinkDesc + /** + * todo QueryPlan has BaseSemanticAnalyzer which has acidFileSinks list of FileSinkDesc * FileSinkDesc.table is ql.metadata.Table * Table.tableSpec which is TableSpec, which has specType which is SpecType * So maybe this can work to know that this is part of dynamic partition insert in which case @@ -862,8 +862,35 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { */ // For each component in this lock request, // add an entry to the txn_components table - // This must be done before HIVE_LOCKS is accessed for (LockComponent lc : rqst.getComponent()) { + if(lc.isSetIsAcid() && !lc.isIsAcid()) { + //we don't prevent using non-acid resources in a txn but we do lock them + continue; + } + boolean updateTxnComponents; + if(!lc.isSetOperationType()) { + //request came from old version of the client + updateTxnComponents = true;//this matches old behavior + } + else { + switch (lc.getOperationType()) { + case INSERT: + case UPDATE: + case DELETE: + updateTxnComponents = true; + break; + case SELECT: + updateTxnComponents = false; + break; + default: + //since we have an open transaction, only 4 values above are expected + throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType() + + " agentInfo=" + rqst.getAgentInfo() + " " + JavaUtils.txnIdToString(txnid)); + } + } + if(!updateTxnComponents) { + continue; + } String dbName = lc.getDbname(); String tblName = lc.getTablename(); String partName = lc.getPartitionname(); @@ -872,14 +899,19 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { "values (" + txnid + ", '" + dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'") + ", " + (partName == null ? "null" : "'" + partName + "'")+ "," + - quoteString(OpertaionType.fromLockType(lc.getType()).toString()) + ")"; + quoteString(OpertaionType.fromDataOperationType(lc.getOperationType()).toString()) + ")"; LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); + int modCount = stmt.executeUpdate(s); } } long intLockId = 0; for (LockComponent lc : rqst.getComponent()) { + if(lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET) { + //old version of thrift client should have (lc.isSetOperationType() == false) + throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component " + + lc + " agentInfo=" + rqst.getAgentInfo()); + } intLockId++; String dbName = lc.getDbname(); String tblName = lc.getTablename(); @@ -1454,21 +1486,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { ensureValidTxn(dbConn, rqst.getTxnid(), stmt); shouldNeverHappen(rqst.getTxnid()); } - //we should be able to get this from AddDynamicPartitions object longer term; in fact we'd have to - //for multi stmt txns if same table is written more than once per tx - // MoveTask knows if it's I/U/D - // MoveTask calls Hive.loadDynamicPartitions() which calls HiveMetaStoreClient.addDynamicPartitions() - // which ends up here so we'd need to add a field to AddDynamicPartitions. - String findOperationType = " tc_operation_type from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid() - + " and tc_database=" + quoteString(rqst.getDbname()) + " and tc_table=" + quoteString(rqst.getTablename()); - //do limit 1 on this; currently they will all have the same operations - rs = stmt.executeQuery(addLimitClause(1, findOperationType)); - if(!rs.next()) { - throw new IllegalStateException("Unable to determine tc_operation_type for " + JavaUtils.txnIdToString(rqst.getTxnid())); + //for RU this may be null so we should default it to 'u' which is most restrictive + OpertaionType ot = OpertaionType.UPDATE; + if(rqst.isSetOperationType()) { + ot = OpertaionType.fromDataOperationType(rqst.getOperationType()); } - OpertaionType ot = OpertaionType.fromString(rs.getString(1).charAt(0)); - //what if a txn writes the same table > 1 time... let's go with this for now, but really + //what if a txn writes the same table > 1 time...(HIVE-9675) let's go with this for now, but really //need to not write this in the first place, i.e. make this delete not needed //see enqueueLockWithRetry() - that's where we write to TXN_COMPONENTS String deleteSql = "delete from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid() + " and tc_database=" + @@ -1477,14 +1501,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { //much "wider" than necessary in a lot of cases. Here on the other hand, we know exactly which //partitions have been written to. w/o this WRITE_SET would contain entries for partitions not actually //written to - stmt.executeUpdate(deleteSql); + int modCount = stmt.executeUpdate(deleteSql); for (String partName : rqst.getPartitionnames()) { String s = "insert into TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type) values (" + rqst.getTxnid() + "," + quoteString(rqst.getDbname()) + "," + quoteString(rqst.getTablename()) + "," + quoteString(partName) + "," + quoteChar(ot.sqlConst) + ")"; LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); + modCount = stmt.executeUpdate(s); } LOG.debug("Going to commit"); dbConn.commit(); @@ -1504,8 +1528,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } /** - * Clean up corresponding records in metastore tables, specifically: - * TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, COMPLETED_COMPACTIONS + * Clean up corresponding records in metastore tables when corresponding object is dropped, + * specifically: TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, COMPLETED_COMPACTIONS */ @Override public void cleanupRecords(HiveObjectType type, Database db, Table table, http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index fc00e6d..23ad54e 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hive.metastore.txn; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.*; import org.apache.log4j.Level; @@ -270,12 +268,14 @@ public class TestCompactionTxnHandler { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("yourtable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.UPDATE); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); req.setTxnid(txnid); @@ -306,6 +306,7 @@ public class TestCompactionTxnHandler { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); + comp.setOperationType(DataOperationType.INSERT); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -317,6 +318,7 @@ public class TestCompactionTxnHandler { txnid = openTxn(); comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("yourtable"); + comp.setOperationType(DataOperationType.DELETE); components = new ArrayList<LockComponent>(1); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -329,6 +331,7 @@ public class TestCompactionTxnHandler { comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("foo"); comp.setPartitionname("bar"); + comp.setOperationType(DataOperationType.UPDATE); components = new ArrayList<LockComponent>(1); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -339,6 +342,7 @@ public class TestCompactionTxnHandler { comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("foo"); comp.setPartitionname("baz"); + comp.setOperationType(DataOperationType.UPDATE); components = new ArrayList<LockComponent>(1); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -395,13 +399,17 @@ public class TestCompactionTxnHandler { // lock a table, as in dynamic partitions LockComponent lc = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, dbName); lc.setTablename(tableName); + DataOperationType dop = DataOperationType.UPDATE; + lc.setOperationType(dop); LockRequest lr = new LockRequest(Arrays.asList(lc), "me", "localhost"); lr.setTxnid(txnId); LockResponse lock = txnHandler.lock(lr); assertEquals(LockState.ACQUIRED, lock.getState()); - txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnId, dbName, tableName, - Arrays.asList("ds=yesterday", "ds=today"))); + AddDynamicPartitions adp = new AddDynamicPartitions(txnId, dbName, tableName, + Arrays.asList("ds=yesterday", "ds=today")); + adp.setOperationType(dop); + txnHandler.addDynamicPartitions(adp); txnHandler.commitTxn(new CommitTxnRequest(txnId)); Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(1000); http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index ccaf91c..0d4fc59 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -177,6 +177,7 @@ public class TestTxnHandler { public void testLockDifferentDBs() throws Exception { // Test that two different databases don't collide on their locks LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -184,6 +185,7 @@ public class TestTxnHandler { assertTrue(res.getState() == LockState.ACQUIRED); comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "yourdb"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -195,6 +197,7 @@ public class TestTxnHandler { public void testLockSameDB() throws Exception { // Test that two different databases don't collide on their locks LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -202,6 +205,7 @@ public class TestTxnHandler { assertTrue(res.getState() == LockState.ACQUIRED); comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -213,6 +217,7 @@ public class TestTxnHandler { public void testLockDbLocksTable() throws Exception { // Test that locking a database prevents locking of tables in the database LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -220,6 +225,7 @@ public class TestTxnHandler { assertTrue(res.getState() == LockState.ACQUIRED); comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setOperationType(DataOperationType.NO_TXN); comp.setTablename("mytable"); components.clear(); components.add(comp); @@ -232,6 +238,7 @@ public class TestTxnHandler { public void testLockDbDoesNotLockTableInDifferentDB() throws Exception { // Test that locking a database prevents locking of tables in the database LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -239,6 +246,7 @@ public class TestTxnHandler { assertTrue(res.getState() == LockState.ACQUIRED); comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "yourdb"); + comp.setOperationType(DataOperationType.NO_TXN); comp.setTablename("mytable"); components.clear(); components.add(comp); @@ -251,6 +259,7 @@ public class TestTxnHandler { public void testLockDifferentTables() throws Exception { // Test that two different tables don't collide on their locks LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setOperationType(DataOperationType.NO_TXN); comp.setTablename("mytable"); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); @@ -259,6 +268,7 @@ public class TestTxnHandler { assertTrue(res.getState() == LockState.ACQUIRED); comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setOperationType(DataOperationType.NO_TXN); comp.setTablename("yourtable"); components.clear(); components.add(comp); @@ -272,6 +282,7 @@ public class TestTxnHandler { // Test that two different tables don't collide on their locks LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -280,6 +291,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -292,6 +304,7 @@ public class TestTxnHandler { // Test that locking a table prevents locking of partitions of the table LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -301,6 +314,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -313,6 +327,7 @@ public class TestTxnHandler { // Test that locking a table prevents locking of partitions of the table LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -322,6 +337,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("yourtable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -335,6 +351,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -344,6 +361,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("yourpartition"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -357,6 +375,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -366,6 +385,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -379,6 +399,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.INSERT); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -388,6 +409,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.SELECT); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -401,6 +423,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -410,6 +433,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.INSERT); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -419,6 +443,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.SELECT); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -432,6 +457,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.INSERT); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -441,6 +467,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.DELETE); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -455,6 +482,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -464,6 +492,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.SELECT); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -473,6 +502,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.UPDATE); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -487,6 +517,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.SELECT); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -496,6 +527,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -509,6 +541,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -518,6 +551,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.SELECT); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -527,6 +561,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -540,6 +575,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -550,6 +586,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.SELECT); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -563,6 +600,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -573,6 +611,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.DELETE); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -583,6 +622,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.INSERT); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -591,11 +631,31 @@ public class TestTxnHandler { } @Test + public void testWrongLockForOperation() throws Exception { + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); + List<LockComponent> components = new ArrayList<LockComponent>(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); + Exception expectedError = null; + try { + LockResponse res = txnHandler.lock(req); + } + catch(Exception e) { + expectedError = e; + } + Assert.assertTrue(expectedError != null && expectedError.getMessage().contains("Unexpected DataOperationType")); + } + @Test public void testLockSWSWSW() throws Exception { // Test that write blocks two writes LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.DELETE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -606,6 +666,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.DELETE); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -616,6 +677,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.DELETE); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -630,6 +692,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -639,6 +702,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -648,6 +712,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.DELETE); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -662,6 +727,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -671,6 +737,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -680,6 +747,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.SELECT); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -692,6 +760,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.DELETE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -704,6 +773,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.UPDATE); components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -733,6 +803,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.DELETE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -755,12 +826,14 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(2); components.add(comp); comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("anotherpartition"); + comp.setOperationType(DataOperationType.NO_TXN); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); LockResponse res = txnHandler.lock(req); @@ -778,12 +851,14 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(2); components.add(comp); comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("anotherpartition"); + comp.setOperationType(DataOperationType.NO_TXN); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); LockResponse res = txnHandler.lock(req); @@ -794,6 +869,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); components = new ArrayList<LockComponent>(1); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -813,6 +889,7 @@ public class TestTxnHandler { long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); + comp.setOperationType(DataOperationType.DELETE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -828,6 +905,7 @@ public class TestTxnHandler { // Test that committing unlocks long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); + comp.setOperationType(DataOperationType.UPDATE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -847,6 +925,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.DELETE); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -893,6 +972,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -957,6 +1037,7 @@ public class TestTxnHandler { LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); comp.setTablename("mytable"); comp.setPartitionname("mypartition"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -1044,6 +1125,7 @@ public class TestTxnHandler { public void showLocks() throws Exception { long begining = System.currentTimeMillis(); LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setOperationType(DataOperationType.NO_TXN); List<LockComponent> components = new ArrayList<LockComponent>(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); @@ -1053,6 +1135,7 @@ public class TestTxnHandler { long txnid = openTxn(); comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "mydb"); comp.setTablename("mytable"); + comp.setOperationType(DataOperationType.SELECT); components = new ArrayList<LockComponent>(1); components.add(comp); req = new LockRequest(components, "me", "localhost"); @@ -1064,6 +1147,7 @@ public class TestTxnHandler { comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "yourdb"); comp.setTablename("yourtable"); comp.setPartitionname("yourpartition"); + comp.setOperationType(DataOperationType.INSERT); components.add(comp); req = new LockRequest(components, "you", "remotehost"); res = txnHandler.lock(req); http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 8ee9f4c..38f9803 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -426,7 +426,8 @@ public class MoveTask extends Task<MoveWork> implements Serializable { tbd.getHoldDDLTime(), isSkewedStoredAsDirs(tbd), work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, - SessionState.get().getTxnMgr().getCurrentTxnId()); + SessionState.get().getTxnMgr().getCurrentTxnId(), + work.getLoadTableWork().getWriteType()); console.printInfo("\t Time taken for load dynamic partitions : " + (System.currentTimeMillis() - startTime)); http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 15a61d6..cf93bc5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -28,12 +28,11 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.OutputFormat; import java.io.IOException; import java.util.ArrayList; @@ -219,7 +218,20 @@ public class AcidUtils { return result; } - public enum Operation { NOT_ACID, INSERT, UPDATE, DELETE } + public enum Operation { + NOT_ACID(DataOperationType.UNSET), + INSERT(DataOperationType.INSERT), + UPDATE(DataOperationType.UPDATE), + DELETE(DataOperationType.DELETE); + + private final DataOperationType dop; + private Operation(DataOperationType dop) { + this.dop = dop; + } + public DataOperationType toDataOperationType() { + return dop; + } + } public static interface Directory { http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 21b0cb2..5a7ed17 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -161,7 +161,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { boolean atLeastOneLock = false; - LockRequestBuilder rqstBuilder = new LockRequestBuilder(); + LockRequestBuilder rqstBuilder = new LockRequestBuilder(plan.getQueryId()); //link queryId to txnId LOG.info("Setting lock request transaction to " + JavaUtils.txnIdToString(txnId) + " for queryId=" + plan.getQueryId()); rqstBuilder.setTransactionId(txnId) @@ -177,6 +177,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { } LockComponentBuilder compBuilder = new LockComponentBuilder(); compBuilder.setShared(); + compBuilder.setOperationType(DataOperationType.SELECT); Table t = null; switch (input.getType()) { @@ -202,6 +203,9 @@ public class DbTxnManager extends HiveTxnManagerImpl { // This is a file or something we don't hold locks for. continue; } + if(t != null && AcidUtils.isAcidTable(t)) { + compBuilder.setIsAcid(true); + } LockComponent comp = compBuilder.build(); LOG.debug("Adding lock component to lock request " + comp.toString()); rqstBuilder.addLockComponent(comp); @@ -225,27 +229,35 @@ public class DbTxnManager extends HiveTxnManagerImpl { case DDL_EXCLUSIVE: case INSERT_OVERWRITE: compBuilder.setExclusive(); + compBuilder.setOperationType(DataOperationType.NO_TXN); break; case INSERT: - t = output.getTable(); - if(t == null) { - throw new IllegalStateException("No table info for " + output); - } + t = getTable(output); if(AcidUtils.isAcidTable(t)) { compBuilder.setShared(); + compBuilder.setIsAcid(true); } else { compBuilder.setExclusive(); + compBuilder.setIsAcid(false); } + compBuilder.setOperationType(DataOperationType.INSERT); break; case DDL_SHARED: compBuilder.setShared(); + compBuilder.setOperationType(DataOperationType.NO_TXN); break; case UPDATE: + compBuilder.setSemiShared(); + compBuilder.setOperationType(DataOperationType.UPDATE); + t = getTable(output); + break; case DELETE: compBuilder.setSemiShared(); + compBuilder.setOperationType(DataOperationType.DELETE); + t = getTable(output); break; case DDL_NO_LOCK: @@ -279,12 +291,15 @@ public class DbTxnManager extends HiveTxnManagerImpl { // This is a file or something we don't hold locks for. continue; } + if(t != null && AcidUtils.isAcidTable(t)) { + compBuilder.setIsAcid(true); + } LockComponent comp = compBuilder.build(); LOG.debug("Adding lock component to lock request " + comp.toString()); rqstBuilder.addLockComponent(comp); atLeastOneLock = true; } - + //plan // Make sure we need locks. It's possible there's nothing to lock in // this operation. if (!atLeastOneLock) { @@ -300,6 +315,13 @@ public class DbTxnManager extends HiveTxnManagerImpl { ctx.setHiveLocks(locks); return lockState; } + private static Table getTable(WriteEntity we) { + Table t = we.getTable(); + if(t == null) { + throw new IllegalStateException("No table info for " + we); + } + return t; + } /** * This is for testing only. * @param delay time to delay for first heartbeat http://git-wip-us.apache.org/repos/asf/hive/blob/c0b532fc/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 20e1ef6..a67f23a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1540,7 +1540,8 @@ private void constructOneLBLocationMap(FileStatus fSta, */ public Map<Map<String, String>, Partition> loadDynamicPartitions(Path loadPath, String tableName, Map<String, String> partSpec, boolean replace, - int numDP, boolean holdDDLTime, boolean listBucketingEnabled, boolean isAcid, long txnId) + int numDP, boolean holdDDLTime, boolean listBucketingEnabled, boolean isAcid, long txnId, + AcidUtils.Operation operation) throws HiveException { Set<Path> validPartitions = new HashSet<Path>(); @@ -1603,7 +1604,8 @@ private void constructOneLBLocationMap(FileStatus fSta, for (Partition p : partitionsMap.values()) { partNames.add(p.getName()); } - metaStoreClient.addDynamicPartitions(txnId, tbl.getDbName(), tbl.getTableName(), partNames); + metaStoreClient.addDynamicPartitions(txnId, tbl.getDbName(), tbl.getTableName(), + partNames, operation.toDataOperationType()); } return partitionsMap; } catch (IOException e) {
