http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 031e72b..1285c08 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -14681,6 +14681,7 @@ class CreationMetadata: - tblName - tablesUsed - validTxnList + - materializationTime """ thrift_spec = ( @@ -14690,14 +14691,16 @@ class CreationMetadata: (3, TType.STRING, 'tblName', None, None, ), # 3 (4, TType.SET, 'tablesUsed', (TType.STRING,None), None, ), # 4 (5, TType.STRING, 'validTxnList', None, None, ), # 5 + (6, TType.I64, 'materializationTime', None, None, ), # 6 ) - def __init__(self, catName=None, dbName=None, tblName=None, tablesUsed=None, validTxnList=None,): + def __init__(self, catName=None, dbName=None, tblName=None, tablesUsed=None, validTxnList=None, materializationTime=None,): self.catName = catName self.dbName = dbName self.tblName = tblName self.tablesUsed = tablesUsed self.validTxnList = validTxnList + self.materializationTime = materializationTime def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -14738,6 +14741,11 @@ class CreationMetadata: self.validTxnList = iprot.readString() else: iprot.skip(ftype) + elif fid == 6: + if ftype == TType.I64: + self.materializationTime = iprot.readI64() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -14771,6 +14779,10 @@ class CreationMetadata: oprot.writeFieldBegin('validTxnList', TType.STRING, 5) oprot.writeString(self.validTxnList) oprot.writeFieldEnd() + if self.materializationTime is not None: + oprot.writeFieldBegin('materializationTime', TType.I64, 6) + oprot.writeI64(self.materializationTime) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -14793,6 +14805,7 @@ class CreationMetadata: value = (value * 31) ^ hash(self.tblName) value = (value * 31) ^ hash(self.tablesUsed) value = (value * 31) ^ hash(self.validTxnList) + value = (value * 31) ^ hash(self.materializationTime) return value def __repr__(self): @@ -17613,24 +17626,15 @@ class TableMeta: class Materialization: """ Attributes: - - tablesUsed - - validTxnList - - invalidationTime - sourceTablesUpdateDeleteModified """ thrift_spec = ( None, # 0 - (1, TType.SET, 'tablesUsed', (TType.STRING,None), None, ), # 1 - (2, TType.STRING, 'validTxnList', None, None, ), # 2 - (3, TType.I64, 'invalidationTime', None, None, ), # 3 - (4, TType.BOOL, 'sourceTablesUpdateDeleteModified', None, None, ), # 4 + (1, TType.BOOL, 'sourceTablesUpdateDeleteModified', None, None, ), # 1 ) - def __init__(self, tablesUsed=None, validTxnList=None, invalidationTime=None, sourceTablesUpdateDeleteModified=None,): - self.tablesUsed = tablesUsed - self.validTxnList = validTxnList - self.invalidationTime = invalidationTime + def __init__(self, sourceTablesUpdateDeleteModified=None,): self.sourceTablesUpdateDeleteModified = sourceTablesUpdateDeleteModified def read(self, iprot): @@ -17643,26 +17647,6 @@ class Materialization: if ftype == TType.STOP: break if fid == 1: - if ftype == TType.SET: - self.tablesUsed = set() - (_etype763, _size760) = iprot.readSetBegin() - for _i764 in xrange(_size760): - _elem765 = iprot.readString() - self.tablesUsed.add(_elem765) - iprot.readSetEnd() - else: - iprot.skip(ftype) - elif fid == 2: - if ftype == TType.STRING: - self.validTxnList = iprot.readString() - else: - iprot.skip(ftype) - elif fid == 3: - if ftype == TType.I64: - self.invalidationTime = iprot.readI64() - else: - iprot.skip(ftype) - elif fid == 4: if ftype == TType.BOOL: self.sourceTablesUpdateDeleteModified = iprot.readBool() else: @@ -17677,39 +17661,21 @@ class Materialization: oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) return oprot.writeStructBegin('Materialization') - if self.tablesUsed is not None: - oprot.writeFieldBegin('tablesUsed', TType.SET, 1) - oprot.writeSetBegin(TType.STRING, len(self.tablesUsed)) - for iter766 in self.tablesUsed: - oprot.writeString(iter766) - oprot.writeSetEnd() - oprot.writeFieldEnd() - if self.validTxnList is not None: - oprot.writeFieldBegin('validTxnList', TType.STRING, 2) - oprot.writeString(self.validTxnList) - oprot.writeFieldEnd() - if self.invalidationTime is not None: - oprot.writeFieldBegin('invalidationTime', TType.I64, 3) - oprot.writeI64(self.invalidationTime) - oprot.writeFieldEnd() if self.sourceTablesUpdateDeleteModified is not None: - oprot.writeFieldBegin('sourceTablesUpdateDeleteModified', TType.BOOL, 4) + oprot.writeFieldBegin('sourceTablesUpdateDeleteModified', TType.BOOL, 1) oprot.writeBool(self.sourceTablesUpdateDeleteModified) oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() def validate(self): - if self.tablesUsed is None: - raise TProtocol.TProtocolException(message='Required field tablesUsed is unset!') + if self.sourceTablesUpdateDeleteModified is None: + raise TProtocol.TProtocolException(message='Required field sourceTablesUpdateDeleteModified is unset!') return def __hash__(self): value = 17 - value = (value * 31) ^ hash(self.tablesUsed) - value = (value * 31) ^ hash(self.validTxnList) - value = (value * 31) ^ hash(self.invalidationTime) value = (value * 31) ^ hash(self.sourceTablesUpdateDeleteModified) return value @@ -18586,44 +18552,44 @@ class WMFullResourcePlan: elif fid == 2: if ftype == TType.LIST: self.pools = [] - (_etype770, _size767) = iprot.readListBegin() - for _i771 in xrange(_size767): - _elem772 = WMPool() - _elem772.read(iprot) - self.pools.append(_elem772) + (_etype763, _size760) = iprot.readListBegin() + for _i764 in xrange(_size760): + _elem765 = WMPool() + _elem765.read(iprot) + self.pools.append(_elem765) iprot.readListEnd() else: iprot.skip(ftype) elif fid == 3: if ftype == TType.LIST: self.mappings = [] - (_etype776, _size773) = iprot.readListBegin() - for _i777 in xrange(_size773): - _elem778 = WMMapping() - _elem778.read(iprot) - self.mappings.append(_elem778) + (_etype769, _size766) = iprot.readListBegin() + for _i770 in xrange(_size766): + _elem771 = WMMapping() + _elem771.read(iprot) + self.mappings.append(_elem771) iprot.readListEnd() else: iprot.skip(ftype) elif fid == 4: if ftype == TType.LIST: self.triggers = [] - (_etype782, _size779) = iprot.readListBegin() - for _i783 in xrange(_size779): - _elem784 = WMTrigger() - _elem784.read(iprot) - self.triggers.append(_elem784) + (_etype775, _size772) = iprot.readListBegin() + for _i776 in xrange(_size772): + _elem777 = WMTrigger() + _elem777.read(iprot) + self.triggers.append(_elem777) iprot.readListEnd() else: iprot.skip(ftype) elif fid == 5: if ftype == TType.LIST: self.poolTriggers = [] - (_etype788, _size785) = iprot.readListBegin() - for _i789 in xrange(_size785): - _elem790 = WMPoolTrigger() - _elem790.read(iprot) - self.poolTriggers.append(_elem790) + (_etype781, _size778) = iprot.readListBegin() + for _i782 in xrange(_size778): + _elem783 = WMPoolTrigger() + _elem783.read(iprot) + self.poolTriggers.append(_elem783) iprot.readListEnd() else: iprot.skip(ftype) @@ -18644,29 +18610,29 @@ class WMFullResourcePlan: if self.pools is not None: oprot.writeFieldBegin('pools', TType.LIST, 2) oprot.writeListBegin(TType.STRUCT, len(self.pools)) - for iter791 in self.pools: - iter791.write(oprot) + for iter784 in self.pools: + iter784.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() if self.mappings is not None: oprot.writeFieldBegin('mappings', TType.LIST, 3) oprot.writeListBegin(TType.STRUCT, len(self.mappings)) - for iter792 in self.mappings: - iter792.write(oprot) + for iter785 in self.mappings: + iter785.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() if self.triggers is not None: oprot.writeFieldBegin('triggers', TType.LIST, 4) oprot.writeListBegin(TType.STRUCT, len(self.triggers)) - for iter793 in self.triggers: - iter793.write(oprot) + for iter786 in self.triggers: + iter786.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() if self.poolTriggers is not None: oprot.writeFieldBegin('poolTriggers', TType.LIST, 5) oprot.writeListBegin(TType.STRUCT, len(self.poolTriggers)) - for iter794 in self.poolTriggers: - iter794.write(oprot) + for iter787 in self.poolTriggers: + iter787.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -19140,11 +19106,11 @@ class WMGetAllResourcePlanResponse: if fid == 1: if ftype == TType.LIST: self.resourcePlans = [] - (_etype798, _size795) = iprot.readListBegin() - for _i799 in xrange(_size795): - _elem800 = WMResourcePlan() - _elem800.read(iprot) - self.resourcePlans.append(_elem800) + (_etype791, _size788) = iprot.readListBegin() + for _i792 in xrange(_size788): + _elem793 = WMResourcePlan() + _elem793.read(iprot) + self.resourcePlans.append(_elem793) iprot.readListEnd() else: iprot.skip(ftype) @@ -19161,8 +19127,8 @@ class WMGetAllResourcePlanResponse: if self.resourcePlans is not None: oprot.writeFieldBegin('resourcePlans', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.resourcePlans)) - for iter801 in self.resourcePlans: - iter801.write(oprot) + for iter794 in self.resourcePlans: + iter794.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -19466,20 +19432,20 @@ class WMValidateResourcePlanResponse: if fid == 1: if ftype == TType.LIST: self.errors = [] - (_etype805, _size802) = iprot.readListBegin() - for _i806 in xrange(_size802): - _elem807 = iprot.readString() - self.errors.append(_elem807) + (_etype798, _size795) = iprot.readListBegin() + for _i799 in xrange(_size795): + _elem800 = iprot.readString() + self.errors.append(_elem800) iprot.readListEnd() else: iprot.skip(ftype) elif fid == 2: if ftype == TType.LIST: self.warnings = [] - (_etype811, _size808) = iprot.readListBegin() - for _i812 in xrange(_size808): - _elem813 = iprot.readString() - self.warnings.append(_elem813) + (_etype804, _size801) = iprot.readListBegin() + for _i805 in xrange(_size801): + _elem806 = iprot.readString() + self.warnings.append(_elem806) iprot.readListEnd() else: iprot.skip(ftype) @@ -19496,15 +19462,15 @@ class WMValidateResourcePlanResponse: if self.errors is not None: oprot.writeFieldBegin('errors', TType.LIST, 1) oprot.writeListBegin(TType.STRING, len(self.errors)) - for iter814 in self.errors: - oprot.writeString(iter814) + for iter807 in self.errors: + oprot.writeString(iter807) oprot.writeListEnd() oprot.writeFieldEnd() if self.warnings is not None: oprot.writeFieldBegin('warnings', TType.LIST, 2) oprot.writeListBegin(TType.STRING, len(self.warnings)) - for iter815 in self.warnings: - oprot.writeString(iter815) + for iter808 in self.warnings: + oprot.writeString(iter808) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -20081,11 +20047,11 @@ class WMGetTriggersForResourePlanResponse: if fid == 1: if ftype == TType.LIST: self.triggers = [] - (_etype819, _size816) = iprot.readListBegin() - for _i820 in xrange(_size816): - _elem821 = WMTrigger() - _elem821.read(iprot) - self.triggers.append(_elem821) + (_etype812, _size809) = iprot.readListBegin() + for _i813 in xrange(_size809): + _elem814 = WMTrigger() + _elem814.read(iprot) + self.triggers.append(_elem814) iprot.readListEnd() else: iprot.skip(ftype) @@ -20102,8 +20068,8 @@ class WMGetTriggersForResourePlanResponse: if self.triggers is not None: oprot.writeFieldBegin('triggers', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.triggers)) - for iter822 in self.triggers: - iter822.write(oprot) + for iter815 in self.triggers: + iter815.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -21287,11 +21253,11 @@ class SchemaVersion: elif fid == 4: if ftype == TType.LIST: self.cols = [] - (_etype826, _size823) = iprot.readListBegin() - for _i827 in xrange(_size823): - _elem828 = FieldSchema() - _elem828.read(iprot) - self.cols.append(_elem828) + (_etype819, _size816) = iprot.readListBegin() + for _i820 in xrange(_size816): + _elem821 = FieldSchema() + _elem821.read(iprot) + self.cols.append(_elem821) iprot.readListEnd() else: iprot.skip(ftype) @@ -21351,8 +21317,8 @@ class SchemaVersion: if self.cols is not None: oprot.writeFieldBegin('cols', TType.LIST, 4) oprot.writeListBegin(TType.STRUCT, len(self.cols)) - for iter829 in self.cols: - iter829.write(oprot) + for iter822 in self.cols: + iter822.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() if self.state is not None: @@ -21607,11 +21573,11 @@ class FindSchemasByColsResp: if fid == 1: if ftype == TType.LIST: self.schemaVersions = [] - (_etype833, _size830) = iprot.readListBegin() - for _i834 in xrange(_size830): - _elem835 = SchemaVersionDescriptor() - _elem835.read(iprot) - self.schemaVersions.append(_elem835) + (_etype826, _size823) = iprot.readListBegin() + for _i827 in xrange(_size823): + _elem828 = SchemaVersionDescriptor() + _elem828.read(iprot) + self.schemaVersions.append(_elem828) iprot.readListEnd() else: iprot.skip(ftype) @@ -21628,8 +21594,8 @@ class FindSchemasByColsResp: if self.schemaVersions is not None: oprot.writeFieldBegin('schemaVersions', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.schemaVersions)) - for iter836 in self.schemaVersions: - iter836.write(oprot) + for iter829 in self.schemaVersions: + iter829.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop()
http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb index 0348ff2..a0fabfe 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -3277,13 +3277,15 @@ class CreationMetadata TBLNAME = 3 TABLESUSED = 4 VALIDTXNLIST = 5 + MATERIALIZATIONTIME = 6 FIELDS = { CATNAME => {:type => ::Thrift::Types::STRING, :name => 'catName'}, DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'}, TBLNAME => {:type => ::Thrift::Types::STRING, :name => 'tblName'}, TABLESUSED => {:type => ::Thrift::Types::SET, :name => 'tablesUsed', :element => {:type => ::Thrift::Types::STRING}}, - VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 'validTxnList', :optional => true} + VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 'validTxnList', :optional => true}, + MATERIALIZATIONTIME => {:type => ::Thrift::Types::I64, :name => 'materializationTime', :optional => true} } def struct_fields; FIELDS; end @@ -3952,22 +3954,16 @@ end class Materialization include ::Thrift::Struct, ::Thrift::Struct_Union - TABLESUSED = 1 - VALIDTXNLIST = 2 - INVALIDATIONTIME = 3 - SOURCETABLESUPDATEDELETEMODIFIED = 4 + SOURCETABLESUPDATEDELETEMODIFIED = 1 FIELDS = { - TABLESUSED => {:type => ::Thrift::Types::SET, :name => 'tablesUsed', :element => {:type => ::Thrift::Types::STRING}}, - VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 'validTxnList', :optional => true}, - INVALIDATIONTIME => {:type => ::Thrift::Types::I64, :name => 'invalidationTime', :optional => true}, - SOURCETABLESUPDATEDELETEMODIFIED => {:type => ::Thrift::Types::BOOL, :name => 'sourceTablesUpdateDeleteModified', :optional => true} + SOURCETABLESUPDATEDELETEMODIFIED => {:type => ::Thrift::Types::BOOL, :name => 'sourceTablesUpdateDeleteModified'} } def struct_fields; FIELDS; end def validate - raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field tablesUsed is unset!') unless @tablesUsed + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field sourceTablesUpdateDeleteModified is unset!') if @sourceTablesUpdateDeleteModified.nil? end ::Thrift::Struct.generate_accessors self http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/thrift_hive_metastore.rb ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/thrift_hive_metastore.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/thrift_hive_metastore.rb index 2bd958e..5ecfbed 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/thrift_hive_metastore.rb +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/thrift_hive_metastore.rb @@ -726,13 +726,13 @@ module ThriftHiveMetastore raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_table_objects_by_name_req failed: unknown result') end - def get_materialization_invalidation_info(dbname, tbl_names) - send_get_materialization_invalidation_info(dbname, tbl_names) + def get_materialization_invalidation_info(creation_metadata, validTxnList) + send_get_materialization_invalidation_info(creation_metadata, validTxnList) return recv_get_materialization_invalidation_info() end - def send_get_materialization_invalidation_info(dbname, tbl_names) - send_message('get_materialization_invalidation_info', Get_materialization_invalidation_info_args, :dbname => dbname, :tbl_names => tbl_names) + def send_get_materialization_invalidation_info(creation_metadata, validTxnList) + send_message('get_materialization_invalidation_info', Get_materialization_invalidation_info_args, :creation_metadata => creation_metadata, :validTxnList => validTxnList) end def recv_get_materialization_invalidation_info() @@ -4043,7 +4043,7 @@ module ThriftHiveMetastore args = read_args(iprot, Get_materialization_invalidation_info_args) result = Get_materialization_invalidation_info_result.new() begin - result.success = @handler.get_materialization_invalidation_info(args.dbname, args.tbl_names) + result.success = @handler.get_materialization_invalidation_info(args.creation_metadata, args.validTxnList) rescue ::MetaException => o1 result.o1 = o1 rescue ::InvalidOperationException => o2 @@ -7654,12 +7654,12 @@ module ThriftHiveMetastore class Get_materialization_invalidation_info_args include ::Thrift::Struct, ::Thrift::Struct_Union - DBNAME = 1 - TBL_NAMES = 2 + CREATION_METADATA = 1 + VALIDTXNLIST = 2 FIELDS = { - DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbname'}, - TBL_NAMES => {:type => ::Thrift::Types::LIST, :name => 'tbl_names', :element => {:type => ::Thrift::Types::STRING}} + CREATION_METADATA => {:type => ::Thrift::Types::STRUCT, :name => 'creation_metadata', :class => ::CreationMetadata}, + VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 'validTxnList'} } def struct_fields; FIELDS; end @@ -7678,7 +7678,7 @@ module ThriftHiveMetastore O3 = 3 FIELDS = { - SUCCESS => {:type => ::Thrift::Types::MAP, :name => 'success', :key => {:type => ::Thrift::Types::STRING}, :value => {:type => ::Thrift::Types::STRUCT, :class => ::Materialization}}, + SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class => ::Materialization}, O1 => {:type => ::Thrift::Types::STRUCT, :name => 'o1', :class => ::MetaException}, O2 => {:type => ::Thrift::Types::STRUCT, :name => 'o2', :class => ::InvalidOperationException}, O3 => {:type => ::Thrift::Types::STRUCT, :name => 'o3', :class => ::UnknownDBException} http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 8d88749..e6f7333 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -3009,8 +3009,8 @@ public class HiveMetaStore extends ThriftHiveMetastore { } @Override - public Map<String, Materialization> get_materialization_invalidation_info(final String dbName, final List<String> tableNames) { - return MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(dbName, tableNames); + public Materialization get_materialization_invalidation_info(final CreationMetadata cm, final String validTxnList) throws MetaException { + return getTxnHandler().getMaterializationInvalidationInfo(cm, validTxnList); } @Override @@ -8670,13 +8670,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { @Override public LockResponse get_lock_materialization_rebuild(String dbName, String tableName, long txnId) throws TException { - return MaterializationsRebuildLockHandler.get().lockResource(dbName, tableName, txnId); + return getTxnHandler().lockMaterializationRebuild(dbName, tableName, txnId); } @Override public boolean heartbeat_lock_materialization_rebuild(String dbName, String tableName, long txnId) throws TException { - return MaterializationsRebuildLockHandler.get().refreshLockResource(dbName, tableName, txnId); + return getTxnHandler().heartbeatLockMaterializationRebuild(dbName, tableName, txnId); } @Override @@ -8992,8 +8992,6 @@ public class HiveMetaStore extends ThriftHiveMetastore { false); IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf); - // Initialize materializations invalidation cache - MaterializationsInvalidationCache.get().init(conf, handler); TServerSocket serverSocket; if (useSasl) { http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index bfd7141..acdb73b 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -167,8 +167,6 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { // instantiate the metastore server handler directly instead of connecting // through the network client = HiveMetaStore.newRetryingHMSHandler("hive client", this.conf, true); - // Initialize materializations invalidation cache (only for local metastore) - MaterializationsInvalidationCache.get().init(conf, (IHMSHandler) client); isConnected = true; snapshotActiveConf(); return; @@ -1610,10 +1608,9 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { } @Override - public Map<String, Materialization> getMaterializationsInvalidationInfo(String dbName, List<String> viewNames) + public Materialization getMaterializationInvalidationInfo(CreationMetadata cm, String validTxnList) throws MetaException, InvalidOperationException, UnknownDBException, TException { - return client.get_materialization_invalidation_info( - dbName, filterHook.filterTableNames(getDefaultCatalog(conf), dbName, viewNames)); + return client.get_materialization_invalidation_info(cm, validTxnList); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index b5d147b..9661beb 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -775,7 +775,7 @@ public interface IMetaStoreClient { /** * Returns the invalidation information for the materialized views given as input. */ - Map<String, Materialization> getMaterializationsInvalidationInfo(String dbName, List<String> viewNames) + Materialization getMaterializationInvalidationInfo(CreationMetadata cm, String validTxnList) throws MetaException, InvalidOperationException, UnknownDBException, TException; /** http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java deleted file mode 100644 index cc168a9..0000000 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; - -/** - * Task responsible for cleaning the transactions that are not useful from the - * materializations cache. - */ -public class MaterializationsCacheCleanerTask implements MetastoreTaskThread { - private static final Logger LOG = LoggerFactory.getLogger(MaterializationsCacheCleanerTask.class); - - private Configuration conf; - - @Override - public long runFrequency(TimeUnit unit) { - return MetastoreConf.getTimeVar(conf, - MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_CLEAN_FREQUENCY, unit); - } - - @Override - public void setConf(Configuration configuration) { - conf = configuration; - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void run() { - long removedCnt = MaterializationsInvalidationCache.get().cleanup(System.currentTimeMillis() - - MetastoreConf.getTimeVar(conf, - MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_EXPIRY_DURATION, TimeUnit.MILLISECONDS)); - if (removedCnt > 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Number of transaction entries deleted from materializations cache: " + removedCnt); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java deleted file mode 100644 index fc644f0..0000000 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java +++ /dev/null @@ -1,543 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore; - -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import org.apache.hadoop.conf.Configuration; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import org.apache.hadoop.hive.common.ValidTxnWriteIdList; -import org.apache.hadoop.hive.common.ValidWriteIdList; -import org.apache.hadoop.hive.metastore.api.BasicTxnInfo; -import org.apache.hadoop.hive.metastore.api.LockResponse; -import org.apache.hadoop.hive.metastore.api.Materialization; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; - -/** - * This cache keeps information in memory about the table modifications so materialized views - * can verify their invalidation time, i.e., the moment after materialization on which the - * first transaction to the tables they used happened. This information is kept in memory - * to check the invalidation quickly. However, we store enough information in the metastore - * to bring this cache up if the metastore is restarted or would crashed. This cache lives - * in the metastore server. - */ -public final class MaterializationsInvalidationCache { - - private static final Logger LOG = LoggerFactory.getLogger(MaterializationsInvalidationCache.class); - - /* Singleton */ - private static final MaterializationsInvalidationCache SINGLETON = new MaterializationsInvalidationCache(); - - /* If this boolean is true, this class has no functionality. Only for debugging purposes. */ - private boolean disable; - - /* Key is the database name. Each value is a map from the unique view qualified name to - * the materialization invalidation info. This invalidation object contains information - * such as the tables used by the materialized view, whether there was any update or - * delete in the source tables since the materialized view was created or rebuilt, - * or the invalidation time, i.e., first modification of the tables used by materialized - * view after the view was created. */ - private final ConcurrentMap<String, ConcurrentMap<String, Materialization>> materializations = - new ConcurrentHashMap<>(); - - /* - * Key is a qualified table name. The value is a (sorted) tree map (supporting concurrent - * modifications) that will keep the modifications for a given table in the order of their - * transaction id. This is useful to quickly check the invalidation time for a given - * materialization. - */ - private final ConcurrentMap<String, ConcurrentSkipListMap<Long, Long>> tableModifications = - new ConcurrentHashMap<>(); - - private final ConcurrentMap<String, ConcurrentSkipListSet<Long>> updateDeleteTableModifications = - new ConcurrentHashMap<>(); - - /* Whether the cache has been initialized or not. */ - private boolean initialized; - /* Configuration for cache. */ - private Configuration conf; - /* Handler to connect to metastore. */ - private IHMSHandler handler; - - private MaterializationsInvalidationCache() { - } - - /** - * Get instance of MaterializationsInvalidationCache. - * - * @return the singleton - */ - public static MaterializationsInvalidationCache get() { - return SINGLETON; - } - - /** - * Initialize the invalidation cache. - * - * The method is synchronized because we want to avoid initializing the invalidation cache - * multiple times in embedded mode. This will not happen when we run the metastore remotely - * as the method is called only once. - */ - public synchronized void init(Configuration conf, IHMSHandler handler) { - this.conf = conf; - this.handler = handler; - - // This will only be true for debugging purposes - this.disable = MetastoreConf.getVar(conf, - MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_IMPL).equals("DISABLE"); - if (disable) { - // Nothing to do - return; - } - - if (!initialized) { - this.initialized = true; - ExecutorService pool = Executors.newCachedThreadPool(); - pool.submit(new Loader()); - pool.shutdown(); - } - } - - private class Loader implements Runnable { - @Override - public void run() { - try { - RawStore store = handler.getMS(); - for (String catName : store.getCatalogs()) { - for (String dbName : store.getAllDatabases(catName)) { - for (Table mv : store.getTableObjectsByName(catName, dbName, - store.getTables(catName, dbName, null, TableType.MATERIALIZED_VIEW))) { - addMaterializedView(mv.getDbName(), mv.getTableName(), ImmutableSet.copyOf(mv.getCreationMetadata().getTablesUsed()), - mv.getCreationMetadata().getValidTxnList(), OpType.LOAD); - } - } - } - LOG.info("Initialized materializations invalidation cache"); - } catch (Exception e) { - LOG.error("Problem connecting to the metastore when initializing the view registry"); - } - } - } - - /** - * Adds a newly created materialized view to the cache. - * - * @param dbName - * @param tableName - * @param tablesUsed tables used by the materialized view - * @param validTxnList - */ - public void createMaterializedView(String dbName, String tableName, Set<String> tablesUsed, - String validTxnList) { - addMaterializedView(dbName, tableName, tablesUsed, validTxnList, OpType.CREATE); - } - - /** - * Method to call when materialized view is modified. - * - * @param dbName - * @param tableName - * @param tablesUsed tables used by the materialized view - * @param validTxnList - */ - public void alterMaterializedView(String dbName, String tableName, Set<String> tablesUsed, - String validTxnList) { - addMaterializedView(dbName, tableName, tablesUsed, validTxnList, OpType.ALTER); - } - - /** - * Adds the materialized view to the cache. - * - * @param dbName - * @param tableName - * @param tablesUsed tables used by the materialized view - * @param validTxnList - * @param opType - */ - private void addMaterializedView(String dbName, String tableName, Set<String> tablesUsed, - String validTxnList, OpType opType) { - if (disable) { - // Nothing to do - return; - } - // We are going to create the map for each view in the given database - ConcurrentMap<String, Materialization> cq = - new ConcurrentHashMap<String, Materialization>(); - final ConcurrentMap<String, Materialization> prevCq = materializations.putIfAbsent( - dbName, cq); - if (prevCq != null) { - cq = prevCq; - } - // Start the process to add materialization to the cache - // Before loading the materialization in the cache, we need to update some - // important information in the registry to account for rewriting invalidation - if (validTxnList == null) { - // This can happen when the materialized view was created on non-transactional tables - return; - } - if (opType == OpType.CREATE || opType == OpType.ALTER) { - // You store the materialized view - Materialization materialization = new Materialization(tablesUsed); - materialization.setValidTxnList(validTxnList); - cq.put(tableName, materialization); - } else { - ValidTxnWriteIdList txnList = new ValidTxnWriteIdList(validTxnList); - for (String qNameTableUsed : tablesUsed) { - ValidWriteIdList tableTxnList = txnList.getTableValidWriteIdList(qNameTableUsed); - // First we insert a new tree set to keep table modifications, unless it already exists - ConcurrentSkipListMap<Long, Long> modificationsTree = new ConcurrentSkipListMap<>(); - final ConcurrentSkipListMap<Long, Long> prevModificationsTree = tableModifications.putIfAbsent( - qNameTableUsed, modificationsTree); - if (prevModificationsTree != null) { - modificationsTree = prevModificationsTree; - } - // If we are not creating the MV at this instant, but instead it was created previously - // and we are loading it into the cache, we need to go through the transaction entries and - // check if the MV is still valid. - try { - String[] names = qNameTableUsed.split("\\."); - BasicTxnInfo e = handler.getTxnHandler().getFirstCompletedTransactionForTableAfterCommit( - names[0], names[1], tableTxnList); - if (!e.isIsnull()) { - modificationsTree.put(e.getTxnid(), e.getTime()); - // We do not need to do anything more for current table, as we detected - // a modification event that was in the metastore. - continue; - } - } catch (MetaException ex) { - LOG.debug("Materialized view " + Warehouse.getQualifiedName(dbName, tableName) + - " ignored; error loading view into invalidation cache", ex); - return; - } - } - // For LOAD, you only add it if it does exist as you might be loading an outdated MV - Materialization materialization = new Materialization(tablesUsed); - materialization.setValidTxnList(validTxnList); - cq.putIfAbsent(tableName, materialization); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Cached materialized view for rewriting in invalidation cache: " + - Warehouse.getQualifiedName(dbName, tableName)); - } - } - - /** - * This method is called when a table is modified. That way we can keep track of the - * invalidation for the MVs that use that table. - */ - public void notifyTableModification(String dbName, String tableName, - long txnId, long newModificationTime, boolean isUpdateDelete) { - if (disable) { - // Nothing to do - return; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Notification for table {} in database {} received -> id: {}, time: {}", - tableName, dbName, txnId, newModificationTime); - } - if (isUpdateDelete) { - // We update first the update/delete modifications record - ConcurrentSkipListSet<Long> modificationsSet = new ConcurrentSkipListSet<>(); - final ConcurrentSkipListSet<Long> prevModificationsSet = - updateDeleteTableModifications.putIfAbsent(Warehouse.getQualifiedName(dbName, tableName), - modificationsSet); - if (prevModificationsSet != null) { - modificationsSet = prevModificationsSet; - } - modificationsSet.add(txnId); - } - ConcurrentSkipListMap<Long, Long> modificationsTree = new ConcurrentSkipListMap<>(); - final ConcurrentSkipListMap<Long, Long> prevModificationsTree = - tableModifications.putIfAbsent(Warehouse.getQualifiedName(dbName, tableName), modificationsTree); - if (prevModificationsTree != null) { - modificationsTree = prevModificationsTree; - } - modificationsTree.put(txnId, newModificationTime); - } - - /** - * Removes the materialized view from the cache. - * - * @param dbName - * @param tableName - */ - public void dropMaterializedView(String dbName, String tableName) { - if (disable) { - // Nothing to do - return; - } - materializations.get(dbName).remove(tableName); - } - - /** - * Returns the materialized views in the cache for the given database. - * - * @param dbName the database - * @return the collection of materialized views, or the empty collection if none - */ - public Map<String, Materialization> getMaterializationInvalidationInfo( - String dbName, List<String> materializationNames) { - if (materializations.get(dbName) != null) { - ImmutableMap.Builder<String, Materialization> m = ImmutableMap.builder(); - for (String materializationName : materializationNames) { - Materialization materialization = - materializations.get(dbName).get(materializationName); - if (materialization == null) { - LOG.debug("Materialization {} skipped as there is no information " - + "in the invalidation cache about it", materializationName); - continue; - } - // We create a deep copy of the materialization, as we need to set the time - // and whether any update/delete operation happen on the tables that it uses - // since it was created. - Materialization materializationCopy = new Materialization( - materialization.getTablesUsed()); - materializationCopy.setValidTxnList(materialization.getValidTxnList()); - enrichWithInvalidationInfo(materializationCopy); - m.put(materializationName, materializationCopy); - } - Map<String, Materialization> result = m.build(); - if (LOG.isDebugEnabled()) { - LOG.debug("Retrieved the following materializations from the invalidation cache: {}", result); - } - return result; - } - return ImmutableMap.of(); - } - - private void enrichWithInvalidationInfo(Materialization materialization) { - String materializationTxnListString = materialization.getValidTxnList(); - if (materializationTxnListString == null) { - // This can happen when the materialization was created on non-transactional tables - materialization.setInvalidationTime(Long.MIN_VALUE); - return; - } - - // We will obtain the modification time as follows. - // First, we obtain the first element after high watermark (if any) - // Then, we iterate through the elements from min open txn till high - // watermark, updating the modification time after creation if needed - ValidTxnWriteIdList materializationTxnList = new ValidTxnWriteIdList(materializationTxnListString); - long firstModificationTimeAfterCreation = 0L; - boolean containsUpdateDelete = false; - for (String qNameTableUsed : materialization.getTablesUsed()) { - final ValidWriteIdList tableMaterializationTxnList = - materializationTxnList.getTableValidWriteIdList(qNameTableUsed); - - final ConcurrentSkipListMap<Long, Long> usedTableModifications = - tableModifications.get(qNameTableUsed); - if (usedTableModifications == null) { - // This is not necessarily an error, since the table may be empty. To be safe, - // instead of including this materialized view, we just log the information and - // skip it (if table is really empty, it will not matter for performance anyway). - LOG.warn("No information found in invalidation cache for table {}, possible tables are: {}", - qNameTableUsed, tableModifications.keySet()); - materialization.setInvalidationTime(Long.MIN_VALUE); - return; - } - final ConcurrentSkipListSet<Long> usedUDTableModifications = - updateDeleteTableModifications.get(qNameTableUsed); - final Entry<Long, Long> tn = usedTableModifications.higherEntry(tableMaterializationTxnList.getHighWatermark()); - if (tn != null) { - if (firstModificationTimeAfterCreation == 0L || - tn.getValue() < firstModificationTimeAfterCreation) { - firstModificationTimeAfterCreation = tn.getValue(); - } - // Check if there was any update/delete after creation - containsUpdateDelete = usedUDTableModifications != null && - !usedUDTableModifications.tailSet(tableMaterializationTxnList.getHighWatermark(), false).isEmpty(); - } - // Min open txn might be null if there were no open transactions - // when this transaction was being executed - if (tableMaterializationTxnList.getMinOpenWriteId() != null) { - // Invalid transaction list is sorted - int pos = 0; - for (Map.Entry<Long, Long> t : usedTableModifications - .subMap(tableMaterializationTxnList.getMinOpenWriteId(), tableMaterializationTxnList.getHighWatermark()).entrySet()) { - while (pos < tableMaterializationTxnList.getInvalidWriteIds().length && - tableMaterializationTxnList.getInvalidWriteIds()[pos] != t.getKey()) { - pos++; - } - if (pos >= tableMaterializationTxnList.getInvalidWriteIds().length) { - break; - } - if (firstModificationTimeAfterCreation == 0L || - t.getValue() < firstModificationTimeAfterCreation) { - firstModificationTimeAfterCreation = t.getValue(); - } - containsUpdateDelete = containsUpdateDelete || - (usedUDTableModifications != null && usedUDTableModifications.contains(t.getKey())); - } - } - } - - materialization.setInvalidationTime(firstModificationTimeAfterCreation); - materialization.setSourceTablesUpdateDeleteModified(containsUpdateDelete); - } - - private enum OpType { - CREATE, - LOAD, - ALTER - } - - /** - * Removes transaction events that are not relevant anymore. - * @param minTime events generated before this time (ms) can be deleted from the cache - * @return number of events that were deleted from the cache - */ - public long cleanup(long minTime) { - // To remove, mv should meet two conditions: - // 1) Current time - time of transaction > config parameter, and - // 2) Transaction should not be associated with invalidation of a MV - if (disable || !initialized) { - // Bail out - return 0L; - } - // We execute the cleanup in two steps - // First we gather all the transactions that need to be kept - final Multimap<String, Long> keepTxnInfos = HashMultimap.create(); - for (Map.Entry<String, ConcurrentMap<String, Materialization>> e : materializations.entrySet()) { - for (Materialization m : e.getValue().values()) { - ValidTxnWriteIdList txnList = new ValidTxnWriteIdList(m.getValidTxnList()); - boolean canBeDeleted = false; - String currentTableForInvalidatingTxn = null; - long currentInvalidatingTxnId = 0L; - long currentInvalidatingTxnTime = 0L; - for (String qNameTableUsed : m.getTablesUsed()) { - ValidWriteIdList tableTxnList = txnList.getTableValidWriteIdList(qNameTableUsed); - final Entry<Long, Long> tn = tableModifications.get(qNameTableUsed) - .higherEntry(tableTxnList.getHighWatermark()); - if (tn != null) { - if (currentInvalidatingTxnTime == 0L || - tn.getValue() < currentInvalidatingTxnTime) { - // This transaction 1) is the first one examined for this materialization, or - // 2) it is the invalidating transaction. Hence we add it to the transactions to keep. - // 1.- We remove the previous invalidating transaction from the transactions - // to be kept (if needed). - if (canBeDeleted && currentInvalidatingTxnTime < minTime) { - keepTxnInfos.remove(currentTableForInvalidatingTxn, currentInvalidatingTxnId); - } - // 2.- We add this transaction to the transactions that should be kept. - canBeDeleted = !keepTxnInfos.get(qNameTableUsed).contains(tn.getKey()); - keepTxnInfos.put(qNameTableUsed, tn.getKey()); - // 3.- We record this transaction as the current invalidating transaction. - currentTableForInvalidatingTxn = qNameTableUsed; - currentInvalidatingTxnId = tn.getKey(); - currentInvalidatingTxnTime = tn.getValue(); - } - } - if (tableTxnList.getMinOpenWriteId() != null) { - // Invalid transaction list is sorted - int pos = 0; - for (Entry<Long, Long> t : tableModifications.get(qNameTableUsed) - .subMap(tableTxnList.getMinOpenWriteId(), tableTxnList.getHighWatermark()).entrySet()) { - while (pos < tableTxnList.getInvalidWriteIds().length && - tableTxnList.getInvalidWriteIds()[pos] != t.getKey()) { - pos++; - } - if (pos >= tableTxnList.getInvalidWriteIds().length) { - break; - } - if (currentInvalidatingTxnTime == 0L || - t.getValue() < currentInvalidatingTxnTime) { - // This transaction 1) is the first one examined for this materialization, or - // 2) it is the invalidating transaction. Hence we add it to the transactions to keep. - // 1.- We remove the previous invalidating transaction from the transactions - // to be kept (if needed). - if (canBeDeleted && currentInvalidatingTxnTime < minTime) { - keepTxnInfos.remove(currentTableForInvalidatingTxn, currentInvalidatingTxnId); - } - // 2.- We add this transaction to the transactions that should be kept. - canBeDeleted = !keepTxnInfos.get(qNameTableUsed).contains(t.getKey()); - keepTxnInfos.put(qNameTableUsed, t.getKey()); - // 3.- We record this transaction as the current invalidating transaction. - currentTableForInvalidatingTxn = qNameTableUsed; - currentInvalidatingTxnId = t.getKey(); - currentInvalidatingTxnTime = t.getValue(); - } - } - } - } - } - } - // Second, we remove the transactions - long removed = 0L; - for (Entry<String, ConcurrentSkipListMap<Long, Long>> e : tableModifications.entrySet()) { - Collection<Long> c = keepTxnInfos.get(e.getKey()); - ConcurrentSkipListSet<Long> updateDeleteForTable = updateDeleteTableModifications.get(e.getKey()); - for (Iterator<Entry<Long, Long>> it = e.getValue().entrySet().iterator(); it.hasNext();) { - Entry<Long, Long> v = it.next(); - // We need to check again the time because some of the transactions might not be explored - // above, e.g., transactions above the highest transaction mark for all the materialized - // views. - if (v.getValue() < minTime && (c.isEmpty() || !c.contains(v.getKey()))) { - if (LOG.isDebugEnabled()) { - LOG.debug("Transaction removed from cache for table {} -> id: {}, time: {}", - e.getKey(), v.getKey(), v.getValue()); - } - if (updateDeleteForTable != null) { - updateDeleteForTable.remove(v.getKey()); - } - it.remove(); - removed++; - } - } - } - return removed; - } - - /** - * Checks whether the given materialization exists in the invalidation cache. - * @param dbName the database name for the materialization - * @param tblName the table name for the materialization - * @return true if we have information about the materialization in the cache, - * false otherwise - */ - public boolean containsMaterialization(String dbName, String tblName) { - if (disable || dbName == null || tblName == null) { - return false; - } - ConcurrentMap<String, Materialization> dbMaterializations = materializations.get(dbName); - if (dbMaterializations == null || dbMaterializations.get(tblName) == null) { - // This is a table - return false; - } - return true; - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java index 8ca9ede..9ce7d6d 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java @@ -18,7 +18,10 @@ package org.apache.hadoop.hive.metastore; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +35,7 @@ public class MaterializationsRebuildLockCleanerTask implements MetastoreTaskThre private static final Logger LOG = LoggerFactory.getLogger(MaterializationsRebuildLockCleanerTask.class); private Configuration conf; + private TxnStore txnHandler; @Override public long runFrequency(TimeUnit unit) { @@ -41,6 +45,7 @@ public class MaterializationsRebuildLockCleanerTask implements MetastoreTaskThre @Override public void setConf(Configuration configuration) { conf = configuration; + txnHandler = TxnUtils.getTxnStore(conf); } @Override @@ -50,11 +55,26 @@ public class MaterializationsRebuildLockCleanerTask implements MetastoreTaskThre @Override public void run() { - long removedCnt = MaterializationsRebuildLockHandler.get().cleanupResourceLocks( - MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS)); - if (removedCnt > 0) { - if (LOG.isDebugEnabled()) { - LOG.info("Number of materialization locks deleted: " + removedCnt); + if (LOG.isDebugEnabled()) { + LOG.debug("Cleaning up materialization rebuild locks"); + } + + TxnStore.MutexAPI.LockHandle handle = null; + try { + handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.MaterializationRebuild.name()); + ValidTxnList validTxnList = TxnUtils.createValidReadTxnList(txnHandler.getOpenTxns(), 0); + long removedCnt = txnHandler.cleanupMaterializationRebuildLocks(validTxnList, + MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS)); + if (removedCnt > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Number of materialization locks deleted: " + removedCnt); + } + } + } catch(Throwable t) { + LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t); + } finally { + if(handle != null) { + handle.releaseLocks(); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 8721022..bdcbf41 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -1335,13 +1335,6 @@ public class ObjectStore implements RawStore, Configurable { } finally { if (!commited) { rollbackTransaction(); - } else { - if (MetaStoreUtils.isMaterializedViewTable(tbl)) { - // Add to the invalidation cache - MaterializationsInvalidationCache.get().createMaterializedView( - tbl.getDbName(), tbl.getTableName(), tbl.getCreationMetadata().getTablesUsed(), - tbl.getCreationMetadata().getValidTxnList()); - } } } } @@ -1439,10 +1432,6 @@ public class ObjectStore implements RawStore, Configurable { } finally { if (!success) { rollbackTransaction(); - } else { - if (materializedView) { - MaterializationsInvalidationCache.get().dropMaterializedView(dbName, tableName); - } } } return success; @@ -2285,13 +2274,14 @@ public class ObjectStore implements RawStore, Configurable { if (m == null) { return null; } + assert !m.isSetMaterializationTime(); Set<MTable> tablesUsed = new HashSet<>(); for (String fullyQualifiedName : m.getTablesUsed()) { String[] names = fullyQualifiedName.split("\\."); tablesUsed.add(getMTable(m.getCatName(), names[0], names[1], false).mtbl); } return new MCreationMetadata(m.getCatName(), m.getDbName(), m.getTblName(), - tablesUsed, m.getValidTxnList()); + tablesUsed, m.getValidTxnList(), System.currentTimeMillis()); } private CreationMetadata convertToCreationMetadata( @@ -2307,6 +2297,7 @@ public class ObjectStore implements RawStore, Configurable { } CreationMetadata r = new CreationMetadata(s.getCatalogName(), s.getDbName(), s.getTblName(), tablesUsed); + r.setMaterializationTime(s.getMaterializationTime()); if (s.getTxnList() != null) { r.setValidTxnList(s.getTxnList()); } @@ -4210,16 +4201,13 @@ public class ObjectStore implements RawStore, Configurable { MCreationMetadata newMcm = convertToMCreationMetadata(cm); MCreationMetadata mcm = getCreationMetadata(catName, dbname, tablename); mcm.setTables(newMcm.getTables()); + mcm.setMaterializationTime(newMcm.getMaterializationTime()); mcm.setTxnList(newMcm.getTxnList()); // commit the changes success = commitTransaction(); } finally { if (!success) { rollbackTransaction(); - } else { - // Add to the invalidation cache if the creation signature has changed - MaterializationsInvalidationCache.get().alterMaterializedView( - dbname, tablename, cm.getTablesUsed(), cm.getValidTxnList()); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 74a301f..c2bbba5 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.DefaultStorageSchemaReader; import org.apache.hadoop.hive.metastore.HiveAlterHandler; -import org.apache.hadoop.hive.metastore.MaterializationsCacheCleanerTask; import org.apache.hadoop.hive.metastore.MaterializationsRebuildLockCleanerTask; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.RuntimeStatsCleanerTask; @@ -762,8 +761,6 @@ public class MetastoreConf { TASK_THREADS_ALWAYS("metastore.task.threads.always", "metastore.task.threads.always", EventCleanerTask.class.getName() + "," + RuntimeStatsCleanerTask.class.getName() + "," + "org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask" + "," + - MaterializationsCacheCleanerTask.class.getName() + "," + - MaterializationsRebuildLockCleanerTask.class.getName() + "," + RuntimeStatsCleanerTask.class.getName() + "," + "org.apache.hadoop.hive.metastore.HiveProtoEventsCleanerTask", "Comma separated list of tasks that will be started in separate threads. These will " + "always be started, regardless of whether the metastore is running in embedded mode " + @@ -772,7 +769,8 @@ public class MetastoreConf { AcidHouseKeeperService.class.getName() + "," + AcidOpenTxnsCounterService.class.getName() + "," + AcidCompactionHistoryService.class.getName() + "," + - AcidWriteSetService.class.getName(), + AcidWriteSetService.class.getName() + "," + + MaterializationsRebuildLockCleanerTask.class.getName(), "Command separated list of tasks that will be started in separate threads. These will be" + " started only when the metastore is running as a separate service. They must " + "implement " + MetastoreTaskThread.class.getName()), http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java index 66b5d48..2d65126 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java @@ -22,8 +22,8 @@ import java.util.Set; /** * Represents the creation metadata of a materialization. * It includes the database and table name for the materialization, - * the set of tables that it uses, and the valid transaction list - * when it was created. + * the set of tables that it uses, the valid transaction list + * when it was created, and the creation/rebuild time. */ public class MCreationMetadata { @@ -32,17 +32,19 @@ public class MCreationMetadata { private String tblName; private Set<MTable> tables; private String txnList; + private long materializationTime; public MCreationMetadata() { } public MCreationMetadata(String catName, String dbName, String tblName, - Set<MTable> tables, String txnList) { + Set<MTable> tables, String txnList, long materializationTime) { this.catalogName = catName; this.dbName = dbName; this.tblName = tblName; this.tables = tables; this.txnList = txnList; + this.materializationTime = materializationTime; } public Set<MTable> getTables() { @@ -84,4 +86,12 @@ public class MCreationMetadata { public void setTblName(String tblName) { this.tblName = tblName; } + + public long getMaterializationTime() { + return materializationTime; + } + + public void setMaterializationTime(long materializationTime) { + this.materializationTime = materializationTime; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index f8c2ca2..2bae133 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -94,9 +94,9 @@ public final class TxnDbUtil { " CTC_DATABASE varchar(128) NOT NULL," + " CTC_TABLE varchar(128)," + " CTC_PARTITION varchar(767)," + - " CTC_ID bigint GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) NOT NULL," + " CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL," + - " CTC_WRITEID bigint)"); + " CTC_WRITEID bigint," + + " CTC_UPDATE_DELETE char(1) NOT NULL)"); stmt.execute("CREATE TABLE NEXT_TXN_ID (" + " NTXN_NEXT bigint NOT NULL)"); stmt.execute("INSERT INTO NEXT_TXN_ID VALUES(1)"); @@ -194,6 +194,14 @@ public final class TxnDbUtil { " PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID))" ); + stmt.execute("CREATE TABLE MATERIALIZATION_REBUILD_LOCKS (" + + " MRL_TXN_ID BIGINT NOT NULL, " + + " MRL_DB_NAME VARCHAR(128) NOT NULL, " + + " MRL_TBL_NAME VARCHAR(256) NOT NULL, " + + " MRL_LAST_HEARTBEAT BIGINT NOT NULL, " + + " PRIMARY KEY(MRL_TXN_ID))" + ); + try { stmt.execute("CREATE TABLE \"APP\".\"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\" VARCHAR(256) NOT " + @@ -336,6 +344,7 @@ public final class TxnDbUtil { success &= dropTable(stmt, "AUX_TABLE", retryCount); success &= dropTable(stmt, "WRITE_SET", retryCount); success &= dropTable(stmt, "REPL_TXN_MAP", retryCount); + success &= dropTable(stmt, "MATERIALIZATION_REBUILD_LOCKS", retryCount); /* * Don't drop NOTIFICATION_LOG, SEQUENCE_TABLE and NOTIFICATION_SEQUENCE as its used by other * table which are not txn related to generate primary key. So if these tables are dropped