Repository: hive Updated Branches: refs/heads/master 793681c76 -> c57a59611
http://git-wip-us.apache.org/repos/asf/hive/blob/c57a5961/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 4db9680..8d88cd7 100644 --- a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -9984,6 +9984,7 @@ class CompactionRequest: - partitionname - type - runas + - properties """ thrift_spec = ( @@ -9993,14 +9994,16 @@ class CompactionRequest: (3, TType.STRING, 'partitionname', None, None, ), # 3 (4, TType.I32, 'type', None, None, ), # 4 (5, TType.STRING, 'runas', None, None, ), # 5 + (6, TType.MAP, 'properties', (TType.STRING,None,TType.STRING,None), None, ), # 6 ) - def __init__(self, dbname=None, tablename=None, partitionname=None, type=None, runas=None,): + def __init__(self, dbname=None, tablename=None, partitionname=None, type=None, runas=None, properties=None,): self.dbname = dbname self.tablename = tablename self.partitionname = partitionname self.type = type self.runas = runas + self.properties = properties def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -10036,6 +10039,17 @@ class CompactionRequest: self.runas = iprot.readString() else: iprot.skip(ftype) + elif fid == 6: + if ftype == TType.MAP: + self.properties = {} + (_ktype463, _vtype464, _size462 ) = iprot.readMapBegin() + for _i466 in xrange(_size462): + _key467 = iprot.readString() + _val468 = iprot.readString() + self.properties[_key467] = _val468 + iprot.readMapEnd() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -10066,6 +10080,14 @@ class CompactionRequest: oprot.writeFieldBegin('runas', TType.STRING, 5) oprot.writeString(self.runas) oprot.writeFieldEnd() + if self.properties is not None: + oprot.writeFieldBegin('properties', TType.MAP, 6) + oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.properties)) + for kiter469,viter470 in self.properties.items(): + oprot.writeString(kiter469) + oprot.writeString(viter470) + oprot.writeMapEnd() + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -10086,6 +10108,7 @@ class CompactionRequest: value = (value * 31) ^ hash(self.partitionname) value = (value * 31) ^ hash(self.type) value = (value * 31) ^ hash(self.runas) + value = (value * 31) ^ hash(self.properties) return value def __repr__(self): @@ -10387,11 +10410,11 @@ class ShowCompactResponse: if fid == 1: if ftype == TType.LIST: self.compacts = [] - (_etype465, _size462) = iprot.readListBegin() - for _i466 in xrange(_size462): - _elem467 = ShowCompactResponseElement() - _elem467.read(iprot) - self.compacts.append(_elem467) + (_etype474, _size471) = iprot.readListBegin() + for _i475 in xrange(_size471): + _elem476 = ShowCompactResponseElement() + _elem476.read(iprot) + self.compacts.append(_elem476) iprot.readListEnd() else: iprot.skip(ftype) @@ -10408,8 +10431,8 @@ class ShowCompactResponse: if self.compacts is not None: oprot.writeFieldBegin('compacts', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.compacts)) - for iter468 in self.compacts: - iter468.write(oprot) + for iter477 in self.compacts: + iter477.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -10490,10 +10513,10 @@ class AddDynamicPartitions: elif fid == 4: if ftype == TType.LIST: self.partitionnames = [] - (_etype472, _size469) = iprot.readListBegin() - for _i473 in xrange(_size469): - _elem474 = iprot.readString() - self.partitionnames.append(_elem474) + (_etype481, _size478) = iprot.readListBegin() + for _i482 in xrange(_size478): + _elem483 = iprot.readString() + self.partitionnames.append(_elem483) iprot.readListEnd() else: iprot.skip(ftype) @@ -10527,8 +10550,8 @@ class AddDynamicPartitions: if self.partitionnames is not None: oprot.writeFieldBegin('partitionnames', TType.LIST, 4) oprot.writeListBegin(TType.STRING, len(self.partitionnames)) - for iter475 in self.partitionnames: - oprot.writeString(iter475) + for iter484 in self.partitionnames: + oprot.writeString(iter484) oprot.writeListEnd() oprot.writeFieldEnd() if self.operationType is not None: @@ -10814,11 +10837,11 @@ class NotificationEventResponse: if fid == 1: if ftype == TType.LIST: self.events = [] - (_etype479, _size476) = iprot.readListBegin() - for _i480 in xrange(_size476): - _elem481 = NotificationEvent() - _elem481.read(iprot) - self.events.append(_elem481) + (_etype488, _size485) = iprot.readListBegin() + for _i489 in xrange(_size485): + _elem490 = NotificationEvent() + _elem490.read(iprot) + self.events.append(_elem490) iprot.readListEnd() else: iprot.skip(ftype) @@ -10835,8 +10858,8 @@ class NotificationEventResponse: if self.events is not None: oprot.writeFieldBegin('events', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.events)) - for iter482 in self.events: - iter482.write(oprot) + for iter491 in self.events: + iter491.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -10957,10 +10980,10 @@ class InsertEventRequestData: if fid == 1: if ftype == TType.LIST: self.filesAdded = [] - (_etype486, _size483) = iprot.readListBegin() - for _i487 in xrange(_size483): - _elem488 = iprot.readString() - self.filesAdded.append(_elem488) + (_etype495, _size492) = iprot.readListBegin() + for _i496 in xrange(_size492): + _elem497 = iprot.readString() + self.filesAdded.append(_elem497) iprot.readListEnd() else: iprot.skip(ftype) @@ -10977,8 +11000,8 @@ class InsertEventRequestData: if self.filesAdded is not None: oprot.writeFieldBegin('filesAdded', TType.LIST, 1) oprot.writeListBegin(TType.STRING, len(self.filesAdded)) - for iter489 in self.filesAdded: - oprot.writeString(iter489) + for iter498 in self.filesAdded: + oprot.writeString(iter498) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -11131,10 +11154,10 @@ class FireEventRequest: elif fid == 5: if ftype == TType.LIST: self.partitionVals = [] - (_etype493, _size490) = iprot.readListBegin() - for _i494 in xrange(_size490): - _elem495 = iprot.readString() - self.partitionVals.append(_elem495) + (_etype502, _size499) = iprot.readListBegin() + for _i503 in xrange(_size499): + _elem504 = iprot.readString() + self.partitionVals.append(_elem504) iprot.readListEnd() else: iprot.skip(ftype) @@ -11167,8 +11190,8 @@ class FireEventRequest: if self.partitionVals is not None: oprot.writeFieldBegin('partitionVals', TType.LIST, 5) oprot.writeListBegin(TType.STRING, len(self.partitionVals)) - for iter496 in self.partitionVals: - oprot.writeString(iter496) + for iter505 in self.partitionVals: + oprot.writeString(iter505) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -11355,12 +11378,12 @@ class GetFileMetadataByExprResult: if fid == 1: if ftype == TType.MAP: self.metadata = {} - (_ktype498, _vtype499, _size497 ) = iprot.readMapBegin() - for _i501 in xrange(_size497): - _key502 = iprot.readI64() - _val503 = MetadataPpdResult() - _val503.read(iprot) - self.metadata[_key502] = _val503 + (_ktype507, _vtype508, _size506 ) = iprot.readMapBegin() + for _i510 in xrange(_size506): + _key511 = iprot.readI64() + _val512 = MetadataPpdResult() + _val512.read(iprot) + self.metadata[_key511] = _val512 iprot.readMapEnd() else: iprot.skip(ftype) @@ -11382,9 +11405,9 @@ class GetFileMetadataByExprResult: if self.metadata is not None: oprot.writeFieldBegin('metadata', TType.MAP, 1) oprot.writeMapBegin(TType.I64, TType.STRUCT, len(self.metadata)) - for kiter504,viter505 in self.metadata.items(): - oprot.writeI64(kiter504) - viter505.write(oprot) + for kiter513,viter514 in self.metadata.items(): + oprot.writeI64(kiter513) + viter514.write(oprot) oprot.writeMapEnd() oprot.writeFieldEnd() if self.isSupported is not None: @@ -11454,10 +11477,10 @@ class GetFileMetadataByExprRequest: if fid == 1: if ftype == TType.LIST: self.fileIds = [] - (_etype509, _size506) = iprot.readListBegin() - for _i510 in xrange(_size506): - _elem511 = iprot.readI64() - self.fileIds.append(_elem511) + (_etype518, _size515) = iprot.readListBegin() + for _i519 in xrange(_size515): + _elem520 = iprot.readI64() + self.fileIds.append(_elem520) iprot.readListEnd() else: iprot.skip(ftype) @@ -11489,8 +11512,8 @@ class GetFileMetadataByExprRequest: if self.fileIds is not None: oprot.writeFieldBegin('fileIds', TType.LIST, 1) oprot.writeListBegin(TType.I64, len(self.fileIds)) - for iter512 in self.fileIds: - oprot.writeI64(iter512) + for iter521 in self.fileIds: + oprot.writeI64(iter521) oprot.writeListEnd() oprot.writeFieldEnd() if self.expr is not None: @@ -11564,11 +11587,11 @@ class GetFileMetadataResult: if fid == 1: if ftype == TType.MAP: self.metadata = {} - (_ktype514, _vtype515, _size513 ) = iprot.readMapBegin() - for _i517 in xrange(_size513): - _key518 = iprot.readI64() - _val519 = iprot.readString() - self.metadata[_key518] = _val519 + (_ktype523, _vtype524, _size522 ) = iprot.readMapBegin() + for _i526 in xrange(_size522): + _key527 = iprot.readI64() + _val528 = iprot.readString() + self.metadata[_key527] = _val528 iprot.readMapEnd() else: iprot.skip(ftype) @@ -11590,9 +11613,9 @@ class GetFileMetadataResult: if self.metadata is not None: oprot.writeFieldBegin('metadata', TType.MAP, 1) oprot.writeMapBegin(TType.I64, TType.STRING, len(self.metadata)) - for kiter520,viter521 in self.metadata.items(): - oprot.writeI64(kiter520) - oprot.writeString(viter521) + for kiter529,viter530 in self.metadata.items(): + oprot.writeI64(kiter529) + oprot.writeString(viter530) oprot.writeMapEnd() oprot.writeFieldEnd() if self.isSupported is not None: @@ -11653,10 +11676,10 @@ class GetFileMetadataRequest: if fid == 1: if ftype == TType.LIST: self.fileIds = [] - (_etype525, _size522) = iprot.readListBegin() - for _i526 in xrange(_size522): - _elem527 = iprot.readI64() - self.fileIds.append(_elem527) + (_etype534, _size531) = iprot.readListBegin() + for _i535 in xrange(_size531): + _elem536 = iprot.readI64() + self.fileIds.append(_elem536) iprot.readListEnd() else: iprot.skip(ftype) @@ -11673,8 +11696,8 @@ class GetFileMetadataRequest: if self.fileIds is not None: oprot.writeFieldBegin('fileIds', TType.LIST, 1) oprot.writeListBegin(TType.I64, len(self.fileIds)) - for iter528 in self.fileIds: - oprot.writeI64(iter528) + for iter537 in self.fileIds: + oprot.writeI64(iter537) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -11780,20 +11803,20 @@ class PutFileMetadataRequest: if fid == 1: if ftype == TType.LIST: self.fileIds = [] - (_etype532, _size529) = iprot.readListBegin() - for _i533 in xrange(_size529): - _elem534 = iprot.readI64() - self.fileIds.append(_elem534) + (_etype541, _size538) = iprot.readListBegin() + for _i542 in xrange(_size538): + _elem543 = iprot.readI64() + self.fileIds.append(_elem543) iprot.readListEnd() else: iprot.skip(ftype) elif fid == 2: if ftype == TType.LIST: self.metadata = [] - (_etype538, _size535) = iprot.readListBegin() - for _i539 in xrange(_size535): - _elem540 = iprot.readString() - self.metadata.append(_elem540) + (_etype547, _size544) = iprot.readListBegin() + for _i548 in xrange(_size544): + _elem549 = iprot.readString() + self.metadata.append(_elem549) iprot.readListEnd() else: iprot.skip(ftype) @@ -11815,15 +11838,15 @@ class PutFileMetadataRequest: if self.fileIds is not None: oprot.writeFieldBegin('fileIds', TType.LIST, 1) oprot.writeListBegin(TType.I64, len(self.fileIds)) - for iter541 in self.fileIds: - oprot.writeI64(iter541) + for iter550 in self.fileIds: + oprot.writeI64(iter550) oprot.writeListEnd() oprot.writeFieldEnd() if self.metadata is not None: oprot.writeFieldBegin('metadata', TType.LIST, 2) oprot.writeListBegin(TType.STRING, len(self.metadata)) - for iter542 in self.metadata: - oprot.writeString(iter542) + for iter551 in self.metadata: + oprot.writeString(iter551) oprot.writeListEnd() oprot.writeFieldEnd() if self.type is not None: @@ -11931,10 +11954,10 @@ class ClearFileMetadataRequest: if fid == 1: if ftype == TType.LIST: self.fileIds = [] - (_etype546, _size543) = iprot.readListBegin() - for _i547 in xrange(_size543): - _elem548 = iprot.readI64() - self.fileIds.append(_elem548) + (_etype555, _size552) = iprot.readListBegin() + for _i556 in xrange(_size552): + _elem557 = iprot.readI64() + self.fileIds.append(_elem557) iprot.readListEnd() else: iprot.skip(ftype) @@ -11951,8 +11974,8 @@ class ClearFileMetadataRequest: if self.fileIds is not None: oprot.writeFieldBegin('fileIds', TType.LIST, 1) oprot.writeListBegin(TType.I64, len(self.fileIds)) - for iter549 in self.fileIds: - oprot.writeI64(iter549) + for iter558 in self.fileIds: + oprot.writeI64(iter558) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -12181,11 +12204,11 @@ class GetAllFunctionsResponse: if fid == 1: if ftype == TType.LIST: self.functions = [] - (_etype553, _size550) = iprot.readListBegin() - for _i554 in xrange(_size550): - _elem555 = Function() - _elem555.read(iprot) - self.functions.append(_elem555) + (_etype562, _size559) = iprot.readListBegin() + for _i563 in xrange(_size559): + _elem564 = Function() + _elem564.read(iprot) + self.functions.append(_elem564) iprot.readListEnd() else: iprot.skip(ftype) @@ -12202,8 +12225,8 @@ class GetAllFunctionsResponse: if self.functions is not None: oprot.writeFieldBegin('functions', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.functions)) - for iter556 in self.functions: - iter556.write(oprot) + for iter565 in self.functions: + iter565.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() http://git-wip-us.apache.org/repos/asf/hive/blob/c57a5961/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb index c7e7cb4..0964cd8 100644 --- a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -2242,13 +2242,15 @@ class CompactionRequest PARTITIONNAME = 3 TYPE = 4 RUNAS = 5 + PROPERTIES = 6 FIELDS = { DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbname'}, TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tablename'}, PARTITIONNAME => {:type => ::Thrift::Types::STRING, :name => 'partitionname', :optional => true}, TYPE => {:type => ::Thrift::Types::I32, :name => 'type', :enum_class => ::CompactionType}, - RUNAS => {:type => ::Thrift::Types::STRING, :name => 'runas', :optional => true} + RUNAS => {:type => ::Thrift::Types::STRING, :name => 'runas', :optional => true}, + PROPERTIES => {:type => ::Thrift::Types::MAP, :name => 'properties', :key => {:type => ::Thrift::Types::STRING}, :value => {:type => ::Thrift::Types::STRING}, :optional => true} } def struct_fields; FIELDS; end http://git-wip-us.apache.org/repos/asf/hive/blob/c57a5961/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 16843af..f47dfa1 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -2148,6 +2148,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient { } @Override + @Deprecated public void compact(String dbname, String tableName, String partitionName, CompactionType type) throws TException { CompactionRequest cr = new CompactionRequest(); @@ -2160,6 +2161,19 @@ public class HiveMetaStoreClient implements IMetaStoreClient { } @Override + public void compact(String dbname, String tableName, String partitionName, CompactionType type, + Map<String, String> tblproperties) throws TException { + CompactionRequest cr = new CompactionRequest(); + if (dbname == null) cr.setDbname(DEFAULT_DATABASE_NAME); + else cr.setDbname(dbname); + cr.setTablename(tableName); + if (partitionName != null) cr.setPartitionname(partitionName); + cr.setType(type); + cr.setProperties(tblproperties); + client.compact(cr); + } + + @Override public ShowCompactResponse showCompactions() throws TException { return client.show_compact(new ShowCompactRequest()); } http://git-wip-us.apache.org/repos/asf/hive/blob/c57a5961/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index 06a1b58..b6fe502 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -1436,10 +1436,28 @@ public interface IMetaStoreClient { * @param type Whether this is a major or minor compaction. * @throws TException */ + @Deprecated void compact(String dbname, String tableName, String partitionName, CompactionType type) throws TException; /** + * Send a request to compact a table or partition. This will not block until the compaction is + * complete. It will instead put a request on the queue for that table or partition to be + * compacted. No checking is done on the dbname, tableName, or partitionName to make sure they + * refer to valid objects. It is assumed this has already been done by the caller. + * @param dbname Name of the database the table is in. If null, this will be assumed to be + * 'default'. + * @param tableName Name of the table to be compacted. This cannot be null. If partitionName + * is null, this must be a non-partitioned table. + * @param partitionName Name of the partition to be compacted + * @param type Whether this is a major or minor compaction. + * @param tblproperties the list of tblproperties to override for this compact. Can be null. + * @throws TException + */ + void compact(String dbname, String tableName, String partitionName, CompactionType type, + Map<String, String> tblproperties) throws TException; + + /** * Get a list of all current compactions. * @return List of all current compactions. This includes compactions waiting to happen, * in progress, and finished but waiting to clean the existing files. http://git-wip-us.apache.org/repos/asf/hive/blob/c57a5961/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java index bea1473..85e0885 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java @@ -37,6 +37,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> { String workerId; long start; public String runAs; + public String properties; public boolean tooManyAborts = false; /** * {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL) @@ -102,6 +103,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> { "partName:" + partName + "," + "state:" + state + "," + "type:" + type + "," + + "properties:" + properties + "," + "runAs:" + runAs + "," + "tooManyAborts:" + tooManyAborts + "," + "highestTxnId:" + highestTxnId; @@ -120,12 +122,13 @@ public class CompactionInfo implements Comparable<CompactionInfo> { fullCi.partName = rs.getString(4); fullCi.state = rs.getString(5).charAt(0);//cq_state fullCi.type = TxnHandler.dbCompactionType2ThriftType(rs.getString(6).charAt(0)); - fullCi.workerId = rs.getString(7); - fullCi.start = rs.getLong(8); - fullCi.runAs = rs.getString(9); - fullCi.highestTxnId = rs.getLong(10); - fullCi.metaInfo = rs.getBytes(11); - fullCi.hadoopJobId = rs.getString(12); + fullCi.properties = rs.getString(7); + fullCi.workerId = rs.getString(8); + fullCi.start = rs.getLong(9); + fullCi.runAs = rs.getString(10); + fullCi.highestTxnId = rs.getLong(11); + fullCi.metaInfo = rs.getBytes(12); + fullCi.hadoopJobId = rs.getString(13); return fullCi; } static void insertIntoCompletedCompactions(PreparedStatement pStmt, CompactionInfo ci, long endTime) throws SQLException { @@ -135,12 +138,13 @@ public class CompactionInfo implements Comparable<CompactionInfo> { pStmt.setString(4, ci.partName); pStmt.setString(5, Character.toString(ci.state)); pStmt.setString(6, Character.toString(TxnHandler.thriftCompactionType2DbType(ci.type))); - pStmt.setString(7, ci.workerId); - pStmt.setLong(8, ci.start); - pStmt.setLong(9, endTime); - pStmt.setString(10, ci.runAs); - pStmt.setLong(11, ci.highestTxnId); - pStmt.setBytes(12, ci.metaInfo); - pStmt.setString(13, ci.hadoopJobId); + pStmt.setString(7, ci.properties); + pStmt.setString(8, ci.workerId); + pStmt.setLong(9, ci.start); + pStmt.setLong(10, endTime); + pStmt.setString(11, ci.runAs); + pStmt.setLong(12, ci.highestTxnId); + pStmt.setBytes(13, ci.metaInfo); + pStmt.setString(14, ci.hadoopJobId); } } http://git-wip-us.apache.org/repos/asf/hive/blob/c57a5961/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index d2d6462..75a4d87 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -168,7 +168,7 @@ class CompactionTxnHandler extends TxnHandler { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "select cq_id, cq_database, cq_table, cq_partition, " + - "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'"; + "cq_type, cq_tblproperties from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { @@ -184,6 +184,7 @@ class CompactionTxnHandler extends TxnHandler { info.tableName = rs.getString(3); info.partName = rs.getString(4); info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0)); + info.properties = rs.getString(6); // Now, update this record as being worked on by this worker. long now = getDbTime(dbConn); s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + @@ -328,7 +329,7 @@ class CompactionTxnHandler extends TxnHandler { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + info.id); + rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + info.id); if(rs.next()) { info = CompactionInfo.loadFullFromCompactionQueue(rs); } @@ -344,7 +345,7 @@ class CompactionTxnHandler extends TxnHandler { LOG.debug("Going to rollback"); dbConn.rollback(); } - pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)"); + pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)"); info.state = SUCCEEDED_STATE; CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn)); updCount = pStmt.executeUpdate(); @@ -837,7 +838,7 @@ class CompactionTxnHandler extends TxnHandler { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + ci.id); + rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + ci.id); if(rs.next()) { ci = CompactionInfo.loadFullFromCompactionQueue(rs); String s = "delete from COMPACTION_QUEUE where cq_id = " + ci.id; @@ -865,7 +866,7 @@ class CompactionTxnHandler extends TxnHandler { } close(rs, stmt, null); - pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)"); + pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)"); CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn)); int updCount = pStmt.executeUpdate(); LOG.debug("Going to commit"); http://git-wip-us.apache.org/repos/asf/hive/blob/c57a5961/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index facce54..60674eb 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -123,6 +123,7 @@ public final class TxnDbUtil { " CQ_PARTITION varchar(767)," + " CQ_STATE char(1) NOT NULL," + " CQ_TYPE char(1) NOT NULL," + + " CQ_TBLPROPERTIES varchar(2048)," + " CQ_WORKER_ID varchar(128)," + " CQ_START bigint," + " CQ_RUN_AS varchar(128)," + @@ -140,6 +141,7 @@ public final class TxnDbUtil { " CC_PARTITION varchar(767)," + " CC_STATE char(1) NOT NULL," + " CC_TYPE char(1) NOT NULL," + + " CC_TBLPROPERTIES varchar(2048)," + " CC_WORKER_ID varchar(128)," + " CC_START bigint," + " CC_END bigint," + http://git-wip-us.apache.org/repos/asf/hive/blob/c57a5961/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 82d685d..9c1b399 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -39,6 +39,7 @@ import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.txn.TxnUtils.StringableMap; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.util.StringUtils; @@ -1366,6 +1367,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { String partName = rqst.getPartitionname(); if (partName != null) buf.append("cq_partition, "); buf.append("cq_state, cq_type"); + if (rqst.getProperties() != null) { + buf.append(", cq_tblproperties"); + } if (rqst.getRunas() != null) buf.append(", cq_run_as"); buf.append(") values ("); buf.append(id); @@ -1394,6 +1398,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { dbConn.rollback(); throw new MetaException("Unexpected compaction type " + rqst.getType().toString()); } + if (rqst.getProperties() != null) { + buf.append("', '"); + buf.append(new StringableMap(rqst.getProperties()).toString()); + } if (rqst.getRunas() != null) { buf.append("', '"); buf.append(rqst.getRunas()); http://git-wip-us.apache.org/repos/asf/hive/blob/c57a5961/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index b829d9d..644aed1 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -30,8 +30,10 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.Set; public class TxnUtils { @@ -209,4 +211,56 @@ public class TxnUtils { long sizeInBytes = 8 * (((sb.length() * 2) + 45) / 8); return sizeInBytes / 1024 > queryMemoryLimit; } + + public static class StringableMap extends HashMap<String, String> { + + public StringableMap(String s) { + String[] parts = s.split(":", 2); + // read that many chars + int numElements = Integer.parseInt(parts[0]); + s = parts[1]; + for (int i = 0; i < numElements; i++) { + parts = s.split(":", 2); + int len = Integer.parseInt(parts[0]); + String key = null; + if (len > 0) key = parts[1].substring(0, len); + parts = parts[1].substring(len).split(":", 2); + len = Integer.parseInt(parts[0]); + String value = null; + if (len > 0) value = parts[1].substring(0, len); + s = parts[1].substring(len); + put(key, value); + } + } + + public StringableMap(Map<String, String> m) { + super(m); + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append(size()); + buf.append(':'); + if (size() > 0) { + for (Map.Entry<String, String> entry : entrySet()) { + int length = (entry.getKey() == null) ? 0 : entry.getKey().length(); + buf.append(entry.getKey() == null ? 0 : length); + buf.append(':'); + if (length > 0) buf.append(entry.getKey()); + length = (entry.getValue() == null) ? 0 : entry.getValue().length(); + buf.append(length); + buf.append(':'); + if (length > 0) buf.append(entry.getValue()); + } + } + return buf.toString(); + } + + public Properties toProperties() { + Properties props = new Properties(); + props.putAll(this); + return props; + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/c57a5961/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 717589a..bc39994 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -1789,7 +1789,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { } partName = partitions.get(0).getName(); } - db.compact(tbl.getDbName(), tbl.getTableName(), partName, desc.getCompactionType()); + db.compact(tbl.getDbName(), tbl.getTableName(), partName, desc.getCompactionType(), desc.getProps()); console.printInfo("Compaction enqueued."); return 0; } http://git-wip-us.apache.org/repos/asf/hive/blob/c57a5961/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 3fa1233..379eddc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -3456,16 +3456,18 @@ private void constructOneLBLocationMap(FileStatus fSta, * @param partName name of the partition, if null table will be compacted (valid only for * non-partitioned tables). * @param compactType major or minor + * @param tblproperties the list of tblproperties to overwrite for this compaction * @throws HiveException */ - public void compact(String dbname, String tableName, String partName, String compactType) + public void compact(String dbname, String tableName, String partName, String compactType, + Map<String, String> tblproperties) throws HiveException { try { CompactionType cr = null; if ("major".equals(compactType)) cr = CompactionType.MAJOR; else if ("minor".equals(compactType)) cr = CompactionType.MINOR; else throw new RuntimeException("Unknown compaction type " + compactType); - getMSC().compact(dbname, tableName, partName, cr); + getMSC().compact(dbname, tableName, partName, cr, tblproperties); } catch (Exception e) { LOG.error(StringUtils.stringifyException(e)); throw new HiveException(e); http://git-wip-us.apache.org/repos/asf/hive/blob/c57a5961/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index 0d735b9..5b32f56 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -1747,6 +1747,11 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { AlterTableSimpleDesc desc = new AlterTableSimpleDesc( tableName, newPartSpec, type); + if (ast.getChildCount() > 1) { + HashMap<String, String> mapProp = getProps((ASTNode) (ast.getChild(1)).getChild(0)); + desc.setProps(mapProp); + } + rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc), conf)); } http://git-wip-us.apache.org/repos/asf/hive/blob/c57a5961/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index e0a84c1..c411f5e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -1363,8 +1363,8 @@ alterStatementSuffixBucketNum alterStatementSuffixCompact @init { msgs.push("compaction request"); } @after { msgs.pop(); } - : KW_COMPACT compactType=StringLiteral - -> ^(TOK_ALTERTABLE_COMPACT $compactType) + : KW_COMPACT compactType=StringLiteral (KW_WITH KW_OVERWRITE KW_TBLPROPERTIES tableProperties)? + -> ^(TOK_ALTERTABLE_COMPACT $compactType tableProperties?) ; http://git-wip-us.apache.org/repos/asf/hive/blob/c57a5961/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java index d819d15..2ae70bb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java @@ -33,6 +33,7 @@ public class AlterTableSimpleDesc extends DDLDesc { private String compactionType; AlterTableTypes type; + private Map<String, String> props; public AlterTableSimpleDesc() { } @@ -99,4 +100,11 @@ public class AlterTableSimpleDesc extends DDLDesc { return compactionType; } + public Map<String, String> getProps() { + return props; + } + + public void setProps(Map<String, String> props) { + this.props = props; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/c57a5961/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 931be90..3a5a325 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.TxnUtils.StringableMap; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; @@ -40,7 +41,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordIdentifier; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; @@ -93,12 +93,16 @@ public class CompactorMR { static final private String DELTA_DIRS = "hive.compactor.delta.dirs"; static final private String DIRS_TO_SEARCH = "hive.compactor.dirs.to.search"; static final private String TMPDIR = "_tmp"; + static final private String TBLPROPS_PREFIX = "tblprops."; + static final private String COMPACTOR_PREFIX = "compactor."; + + private JobConf mrJob; // the MR job for compaction public CompactorMR() { } private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, StorageDescriptor sd, - ValidTxnList txns) { + ValidTxnList txns, CompactionInfo ci) { JobConf job = new JobConf(conf); job.setJobName(jobName); job.setOutputKeyClass(NullWritable.class); @@ -124,9 +128,52 @@ public class CompactorMR { job.set(TABLE_PROPS, new StringableMap(t.getParameters()).toString()); job.setInt(NUM_BUCKETS, sd.getNumBuckets()); job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); + overrideMRProps(job, t.getParameters()); // override MR properties from tblproperties if applicable + if (ci.properties != null) { // override MR properties and general tblproperties if applicable + overrideTblProps(job, t.getParameters(), ci.properties); + } setColumnTypes(job, sd.getCols()); return job; } + + /** + * Parse tblproperties specified on "ALTER TABLE ... COMPACT ... WITH OVERWRITE TBLPROPERTIES ..." + * and override two categories of properties: + * 1. properties of the compactor MR job (with prefix "compactor.") + * 2. general hive properties (with prefix "tblprops.") + * @param job the compactor MR job + * @param tblproperties existing tblproperties + * @param properties table properties + */ + private void overrideTblProps(JobConf job, Map<String, String> tblproperties, String properties) { + StringableMap stringableMap = new StringableMap(properties); + overrideMRProps(job, stringableMap); + // mingle existing tblproperties with those specified on the ALTER TABLE command + for (String key : stringableMap.keySet()) { + if (key.startsWith(TBLPROPS_PREFIX)) { + String propKey = key.substring(9); // 9 is the length of "tblprops.". We only keep the rest + tblproperties.put(propKey, stringableMap.get(key)); + } + } + // re-set TABLE_PROPS with reloaded tblproperties + job.set(TABLE_PROPS, new StringableMap(tblproperties).toString()); + } + + /** + * Parse tblproperties to override relevant properties of compactor MR job with specified values. + * For example, compactor.mapreuce.map.memory.mb=1024 + * @param job the compactor MR job + * @param properties table properties + */ + private void overrideMRProps(JobConf job, Map<String, String> properties) { + for (String key : properties.keySet()) { + if (key.startsWith(COMPACTOR_PREFIX)) { + String mrKey = key.substring(10); // 10 is the length of "compactor." We only keep the rest. + job.set(mrKey, properties.get(key)); + } + } + } + /** * Run Compaction which may consist of several jobs on the cluster. * @param conf Hive configuration file @@ -143,7 +190,7 @@ public class CompactorMR { if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) { throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true"); } - JobConf job = createBaseJobConf(conf, jobName, t, sd, txns); + JobConf job = createBaseJobConf(conf, jobName, t, sd, txns, ci); // Figure out and encode what files we need to read. We do this here (rather than in // getSplits below) because as part of this we discover our minimum and maximum transactions, @@ -168,11 +215,11 @@ public class CompactorMR { "runaway/mis-configured process writing to ACID tables, especially using Streaming Ingest API."); int numMinorCompactions = parsedDeltas.size() / maxDeltastoHandle; for(int jobSubId = 0; jobSubId < numMinorCompactions; jobSubId++) { - JobConf jobMinorCompact = createBaseJobConf(conf, jobName + "_" + jobSubId, t, sd, txns); + JobConf jobMinorCompact = createBaseJobConf(conf, jobName + "_" + jobSubId, t, sd, txns, ci); launchCompactionJob(jobMinorCompact, null, CompactionType.MINOR, null, parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * maxDeltastoHandle), - maxDeltastoHandle, -1); + maxDeltastoHandle, -1, conf); } //now recompute state since we've done minor compactions and have different 'best' set of deltas dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns); @@ -211,14 +258,14 @@ public class CompactorMR { } launchCompactionJob(job, baseDir, ci.type, dirsToSearch, dir.getCurrentDirectories(), - dir.getCurrentDirectories().size(), dir.getObsolete().size()); + dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf); su.gatherStats(); } private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compactionType, StringableList dirsToSearch, List<AcidUtils.ParsedDelta> parsedDeltas, - int curDirNumber, int obsoleteDirNumber) throws IOException { + int curDirNumber, int obsoleteDirNumber, HiveConf hiveConf) throws IOException { job.setBoolean(IS_MAJOR, compactionType == CompactionType.MAJOR); if(dirsToSearch == null) { dirsToSearch = new StringableList(); @@ -240,6 +287,10 @@ public class CompactorMR { job.setLong(MIN_TXN, minTxn); job.setLong(MAX_TXN, maxTxn); + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { + mrJob = job; + } + LOG.info("Submitting " + compactionType + " compaction job '" + job.getJobName() + "' to " + job.getQueueName() + " queue. " + "(current delta dirs count=" + curDirNumber + @@ -274,6 +325,10 @@ public class CompactorMR { HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); } + public JobConf getMrJob() { + return mrJob; + } + static class CompactorInputSplit implements InputSplit { private long length = 0; private List<String> locations; @@ -623,58 +678,6 @@ public class CompactorMR { } - static class StringableMap extends HashMap<String, String> { - - StringableMap(String s) { - String[] parts = s.split(":", 2); - // read that many chars - int numElements = Integer.parseInt(parts[0]); - s = parts[1]; - for (int i = 0; i < numElements; i++) { - parts = s.split(":", 2); - int len = Integer.parseInt(parts[0]); - String key = null; - if (len > 0) key = parts[1].substring(0, len); - parts = parts[1].substring(len).split(":", 2); - len = Integer.parseInt(parts[0]); - String value = null; - if (len > 0) value = parts[1].substring(0, len); - s = parts[1].substring(len); - put(key, value); - } - } - - StringableMap(Map<String, String> m) { - super(m); - } - - @Override - public String toString() { - StringBuilder buf = new StringBuilder(); - buf.append(size()); - buf.append(':'); - if (size() > 0) { - for (Map.Entry<String, String> entry : entrySet()) { - int length = (entry.getKey() == null) ? 0 : entry.getKey().length(); - buf.append(entry.getKey() == null ? 0 : length); - buf.append(':'); - if (length > 0) buf.append(entry.getKey()); - length = (entry.getValue() == null) ? 0 : entry.getValue().length(); - buf.append(length); - buf.append(':'); - if (length > 0) buf.append(entry.getValue()); - } - } - return buf.toString(); - } - - public Properties toProperties() { - Properties props = new Properties(); - props.putAll(this); - return props; - } - } - static class StringableList extends ArrayList<Path> { StringableList() { http://git-wip-us.apache.org/repos/asf/hive/blob/c57a5961/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index a55fa1c..8152c89 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -58,6 +59,8 @@ public class Initiator extends CompactorThread { static final private String CLASS_NAME = Initiator.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + static final private String COMPACTORTHRESHOLD_PREFIX = "compactorthreshold."; + private long checkInterval; @Override @@ -144,7 +147,7 @@ public class Initiator extends CompactorThread { /*Future thought: checkForCompaction will check a lot of file metadata and may be expensive. * Long term we should consider having a thread pool here and running checkForCompactionS * in parallel*/ - CompactionType compactionNeeded = checkForCompaction(ci, txns, sd, runAs); + CompactionType compactionNeeded = checkForCompaction(ci, txns, sd, t.getParameters(), runAs); if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded); } catch (Throwable t) { LOG.error("Caught exception while trying to determine if we should compact " + @@ -213,6 +216,7 @@ public class Initiator extends CompactorThread { private CompactionType checkForCompaction(final CompactionInfo ci, final ValidTxnList txns, final StorageDescriptor sd, + final Map<String, String> tblproperties, final String runAs) throws IOException, InterruptedException { // If it's marked as too many aborted, we already know we need to compact @@ -222,7 +226,7 @@ public class Initiator extends CompactorThread { return CompactionType.MAJOR; } if (runJobAsSelf(runAs)) { - return determineCompactionType(ci, txns, sd); + return determineCompactionType(ci, txns, sd, tblproperties); } else { LOG.info("Going to initiate as user " + runAs); UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs, @@ -230,7 +234,7 @@ public class Initiator extends CompactorThread { CompactionType compactionType = ugi.doAs(new PrivilegedExceptionAction<CompactionType>() { @Override public CompactionType run() throws Exception { - return determineCompactionType(ci, txns, sd); + return determineCompactionType(ci, txns, sd, tblproperties); } }); try { @@ -244,7 +248,7 @@ public class Initiator extends CompactorThread { } private CompactionType determineCompactionType(CompactionInfo ci, ValidTxnList txns, - StorageDescriptor sd) + StorageDescriptor sd, Map<String, String> tblproperties) throws IOException, InterruptedException { boolean noBase = false; Path location = new Path(sd.getLocation()); @@ -282,8 +286,11 @@ public class Initiator extends CompactorThread { if (baseSize == 0 && deltaSize > 0) { noBase = true; } else { - float deltaPctThreshold = HiveConf.getFloatVar(conf, + String deltaPctProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX + HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD); + float deltaPctThreshold = deltaPctProp == null ? + HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) : + Float.parseFloat(deltaPctProp); boolean bigEnough = (float)deltaSize/(float)baseSize > deltaPctThreshold; if (LOG.isDebugEnabled()) { StringBuilder msg = new StringBuilder("delta size: "); @@ -299,8 +306,11 @@ public class Initiator extends CompactorThread { if (bigEnough) return CompactionType.MAJOR; } - int deltaNumThreshold = HiveConf.getIntVar(conf, + String deltaNumProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX + HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD); + int deltaNumThreshold = deltaNumProp == null ? + HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) : + Integer.parseInt(deltaNumProp); boolean enough = deltas.size() > deltaNumThreshold; if (enough) { LOG.debug("Found " + deltas.size() + " delta files, threshold is " + deltaNumThreshold + http://git-wip-us.apache.org/repos/asf/hive/blob/c57a5961/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 767c10c..666f13b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.JobConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.ValidTxnList; @@ -56,6 +57,7 @@ public class Worker extends CompactorThread { static final private int baseThreadNum = 10002; private String name; + private JobConf mrJob; // the MR job for compaction /** * Get the hostname that this worker is run on. Made static and public so that other classes @@ -180,6 +182,9 @@ public class Worker extends CompactorThread { } } txnHandler.markCompacted(ci); + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { + mrJob = mr.getMrJob(); + } } catch (Exception e) { LOG.error("Caught exception while trying to compact " + ci + ". Marking failed to avoid repeated failures, " + StringUtils.stringifyException(e)); @@ -213,6 +218,10 @@ public class Worker extends CompactorThread { setName(name.toString()); } + public JobConf getMrJob() { + return mrJob; + } + static final class StatsUpdater { static final private Logger LOG = LoggerFactory.getLogger(StatsUpdater.class); http://git-wip-us.apache.org/repos/asf/hive/blob/c57a5961/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index cf7eb70..ef7804c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils.StringableMap; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.junit.Assert; import org.junit.Test; @@ -77,19 +78,19 @@ public class TestWorker extends CompactorTest { @Test public void stringableMap() throws Exception { // Empty map case - CompactorMR.StringableMap m = new CompactorMR.StringableMap(new HashMap<String, String>()); + StringableMap m = new StringableMap(new HashMap<String, String>()); String s = m.toString(); Assert.assertEquals("0:", s); - m = new CompactorMR.StringableMap(s); + m = new StringableMap(s); Assert.assertEquals(0, m.size()); Map<String, String> base = new HashMap<String, String>(); base.put("mary", "poppins"); base.put("bert", null); base.put(null, "banks"); - m = new CompactorMR.StringableMap(base); + m = new StringableMap(base); s = m.toString(); - m = new CompactorMR.StringableMap(s); + m = new StringableMap(s); Assert.assertEquals(3, m.size()); Map<String, Boolean> saw = new HashMap<String, Boolean>(3); saw.put("mary", false);