http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 11affe3..8fba3df 100644 --- a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -14508,6 +14508,7 @@ class CreationMetadata: - tblName - tablesUsed - validTxnList + - materializationTime """ thrift_spec = ( @@ -14517,14 +14518,16 @@ class CreationMetadata: (3, TType.STRING, 'tblName', None, None, ), # 3 (4, TType.SET, 'tablesUsed', (TType.STRING,None), None, ), # 4 (5, TType.STRING, 'validTxnList', None, None, ), # 5 + (6, TType.I64, 'materializationTime', None, None, ), # 6 ) - def __init__(self, catName=None, dbName=None, tblName=None, tablesUsed=None, validTxnList=None,): + def __init__(self, catName=None, dbName=None, tblName=None, tablesUsed=None, validTxnList=None, materializationTime=None,): self.catName = catName self.dbName = dbName self.tblName = tblName self.tablesUsed = tablesUsed self.validTxnList = validTxnList + self.materializationTime = materializationTime def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -14565,6 +14568,11 @@ class CreationMetadata: self.validTxnList = iprot.readString() else: iprot.skip(ftype) + elif fid == 6: + if ftype == TType.I64: + self.materializationTime = iprot.readI64() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -14598,6 +14606,10 @@ class CreationMetadata: oprot.writeFieldBegin('validTxnList', TType.STRING, 5) oprot.writeString(self.validTxnList) oprot.writeFieldEnd() + if self.materializationTime is not None: + oprot.writeFieldBegin('materializationTime', TType.I64, 6) + oprot.writeI64(self.materializationTime) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -14620,6 +14632,7 @@ class CreationMetadata: value = (value * 31) ^ hash(self.tblName) value = (value * 31) ^ hash(self.tablesUsed) value = (value * 31) ^ hash(self.validTxnList) + value = (value * 31) ^ hash(self.materializationTime) return value def __repr__(self): @@ -17224,24 +17237,15 @@ class TableMeta: class Materialization: """ Attributes: - - tablesUsed - - validTxnList - - invalidationTime - sourceTablesUpdateDeleteModified """ thrift_spec = ( None, # 0 - (1, TType.SET, 'tablesUsed', (TType.STRING,None), None, ), # 1 - (2, TType.STRING, 'validTxnList', None, None, ), # 2 - (3, TType.I64, 'invalidationTime', None, None, ), # 3 - (4, TType.BOOL, 'sourceTablesUpdateDeleteModified', None, None, ), # 4 + (1, TType.BOOL, 'sourceTablesUpdateDeleteModified', None, None, ), # 1 ) - def __init__(self, tablesUsed=None, validTxnList=None, invalidationTime=None, sourceTablesUpdateDeleteModified=None,): - self.tablesUsed = tablesUsed - self.validTxnList = validTxnList - self.invalidationTime = invalidationTime + def __init__(self, sourceTablesUpdateDeleteModified=None,): self.sourceTablesUpdateDeleteModified = sourceTablesUpdateDeleteModified def read(self, iprot): @@ -17254,26 +17258,6 @@ class Materialization: if ftype == TType.STOP: break if fid == 1: - if ftype == TType.SET: - self.tablesUsed = set() - (_etype742, _size739) = iprot.readSetBegin() - for _i743 in xrange(_size739): - _elem744 = iprot.readString() - self.tablesUsed.add(_elem744) - iprot.readSetEnd() - else: - iprot.skip(ftype) - elif fid == 2: - if ftype == TType.STRING: - self.validTxnList = iprot.readString() - else: - iprot.skip(ftype) - elif fid == 3: - if ftype == TType.I64: - self.invalidationTime = iprot.readI64() - else: - iprot.skip(ftype) - elif fid == 4: if ftype == TType.BOOL: self.sourceTablesUpdateDeleteModified = iprot.readBool() else: @@ -17288,39 +17272,21 @@ class Materialization: oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) return oprot.writeStructBegin('Materialization') - if self.tablesUsed is not None: - oprot.writeFieldBegin('tablesUsed', TType.SET, 1) - oprot.writeSetBegin(TType.STRING, len(self.tablesUsed)) - for iter745 in self.tablesUsed: - oprot.writeString(iter745) - oprot.writeSetEnd() - oprot.writeFieldEnd() - if self.validTxnList is not None: - oprot.writeFieldBegin('validTxnList', TType.STRING, 2) - oprot.writeString(self.validTxnList) - oprot.writeFieldEnd() - if self.invalidationTime is not None: - oprot.writeFieldBegin('invalidationTime', TType.I64, 3) - oprot.writeI64(self.invalidationTime) - oprot.writeFieldEnd() if self.sourceTablesUpdateDeleteModified is not None: - oprot.writeFieldBegin('sourceTablesUpdateDeleteModified', TType.BOOL, 4) + oprot.writeFieldBegin('sourceTablesUpdateDeleteModified', TType.BOOL, 1) oprot.writeBool(self.sourceTablesUpdateDeleteModified) oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() def validate(self): - if self.tablesUsed is None: - raise TProtocol.TProtocolException(message='Required field tablesUsed is unset!') + if self.sourceTablesUpdateDeleteModified is None: + raise TProtocol.TProtocolException(message='Required field sourceTablesUpdateDeleteModified is unset!') return def __hash__(self): value = 17 - value = (value * 31) ^ hash(self.tablesUsed) - value = (value * 31) ^ hash(self.validTxnList) - value = (value * 31) ^ hash(self.invalidationTime) value = (value * 31) ^ hash(self.sourceTablesUpdateDeleteModified) return value @@ -18197,44 +18163,44 @@ class WMFullResourcePlan: elif fid == 2: if ftype == TType.LIST: self.pools = [] - (_etype749, _size746) = iprot.readListBegin() - for _i750 in xrange(_size746): - _elem751 = WMPool() - _elem751.read(iprot) - self.pools.append(_elem751) + (_etype742, _size739) = iprot.readListBegin() + for _i743 in xrange(_size739): + _elem744 = WMPool() + _elem744.read(iprot) + self.pools.append(_elem744) iprot.readListEnd() else: iprot.skip(ftype) elif fid == 3: if ftype == TType.LIST: self.mappings = [] - (_etype755, _size752) = iprot.readListBegin() - for _i756 in xrange(_size752): - _elem757 = WMMapping() - _elem757.read(iprot) - self.mappings.append(_elem757) + (_etype748, _size745) = iprot.readListBegin() + for _i749 in xrange(_size745): + _elem750 = WMMapping() + _elem750.read(iprot) + self.mappings.append(_elem750) iprot.readListEnd() else: iprot.skip(ftype) elif fid == 4: if ftype == TType.LIST: self.triggers = [] - (_etype761, _size758) = iprot.readListBegin() - for _i762 in xrange(_size758): - _elem763 = WMTrigger() - _elem763.read(iprot) - self.triggers.append(_elem763) + (_etype754, _size751) = iprot.readListBegin() + for _i755 in xrange(_size751): + _elem756 = WMTrigger() + _elem756.read(iprot) + self.triggers.append(_elem756) iprot.readListEnd() else: iprot.skip(ftype) elif fid == 5: if ftype == TType.LIST: self.poolTriggers = [] - (_etype767, _size764) = iprot.readListBegin() - for _i768 in xrange(_size764): - _elem769 = WMPoolTrigger() - _elem769.read(iprot) - self.poolTriggers.append(_elem769) + (_etype760, _size757) = iprot.readListBegin() + for _i761 in xrange(_size757): + _elem762 = WMPoolTrigger() + _elem762.read(iprot) + self.poolTriggers.append(_elem762) iprot.readListEnd() else: iprot.skip(ftype) @@ -18255,29 +18221,29 @@ class WMFullResourcePlan: if self.pools is not None: oprot.writeFieldBegin('pools', TType.LIST, 2) oprot.writeListBegin(TType.STRUCT, len(self.pools)) - for iter770 in self.pools: - iter770.write(oprot) + for iter763 in self.pools: + iter763.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() if self.mappings is not None: oprot.writeFieldBegin('mappings', TType.LIST, 3) oprot.writeListBegin(TType.STRUCT, len(self.mappings)) - for iter771 in self.mappings: - iter771.write(oprot) + for iter764 in self.mappings: + iter764.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() if self.triggers is not None: oprot.writeFieldBegin('triggers', TType.LIST, 4) oprot.writeListBegin(TType.STRUCT, len(self.triggers)) - for iter772 in self.triggers: - iter772.write(oprot) + for iter765 in self.triggers: + iter765.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() if self.poolTriggers is not None: oprot.writeFieldBegin('poolTriggers', TType.LIST, 5) oprot.writeListBegin(TType.STRUCT, len(self.poolTriggers)) - for iter773 in self.poolTriggers: - iter773.write(oprot) + for iter766 in self.poolTriggers: + iter766.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -18751,11 +18717,11 @@ class WMGetAllResourcePlanResponse: if fid == 1: if ftype == TType.LIST: self.resourcePlans = [] - (_etype777, _size774) = iprot.readListBegin() - for _i778 in xrange(_size774): - _elem779 = WMResourcePlan() - _elem779.read(iprot) - self.resourcePlans.append(_elem779) + (_etype770, _size767) = iprot.readListBegin() + for _i771 in xrange(_size767): + _elem772 = WMResourcePlan() + _elem772.read(iprot) + self.resourcePlans.append(_elem772) iprot.readListEnd() else: iprot.skip(ftype) @@ -18772,8 +18738,8 @@ class WMGetAllResourcePlanResponse: if self.resourcePlans is not None: oprot.writeFieldBegin('resourcePlans', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.resourcePlans)) - for iter780 in self.resourcePlans: - iter780.write(oprot) + for iter773 in self.resourcePlans: + iter773.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -19077,20 +19043,20 @@ class WMValidateResourcePlanResponse: if fid == 1: if ftype == TType.LIST: self.errors = [] - (_etype784, _size781) = iprot.readListBegin() - for _i785 in xrange(_size781): - _elem786 = iprot.readString() - self.errors.append(_elem786) + (_etype777, _size774) = iprot.readListBegin() + for _i778 in xrange(_size774): + _elem779 = iprot.readString() + self.errors.append(_elem779) iprot.readListEnd() else: iprot.skip(ftype) elif fid == 2: if ftype == TType.LIST: self.warnings = [] - (_etype790, _size787) = iprot.readListBegin() - for _i791 in xrange(_size787): - _elem792 = iprot.readString() - self.warnings.append(_elem792) + (_etype783, _size780) = iprot.readListBegin() + for _i784 in xrange(_size780): + _elem785 = iprot.readString() + self.warnings.append(_elem785) iprot.readListEnd() else: iprot.skip(ftype) @@ -19107,15 +19073,15 @@ class WMValidateResourcePlanResponse: if self.errors is not None: oprot.writeFieldBegin('errors', TType.LIST, 1) oprot.writeListBegin(TType.STRING, len(self.errors)) - for iter793 in self.errors: - oprot.writeString(iter793) + for iter786 in self.errors: + oprot.writeString(iter786) oprot.writeListEnd() oprot.writeFieldEnd() if self.warnings is not None: oprot.writeFieldBegin('warnings', TType.LIST, 2) oprot.writeListBegin(TType.STRING, len(self.warnings)) - for iter794 in self.warnings: - oprot.writeString(iter794) + for iter787 in self.warnings: + oprot.writeString(iter787) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -19692,11 +19658,11 @@ class WMGetTriggersForResourePlanResponse: if fid == 1: if ftype == TType.LIST: self.triggers = [] - (_etype798, _size795) = iprot.readListBegin() - for _i799 in xrange(_size795): - _elem800 = WMTrigger() - _elem800.read(iprot) - self.triggers.append(_elem800) + (_etype791, _size788) = iprot.readListBegin() + for _i792 in xrange(_size788): + _elem793 = WMTrigger() + _elem793.read(iprot) + self.triggers.append(_elem793) iprot.readListEnd() else: iprot.skip(ftype) @@ -19713,8 +19679,8 @@ class WMGetTriggersForResourePlanResponse: if self.triggers is not None: oprot.writeFieldBegin('triggers', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.triggers)) - for iter801 in self.triggers: - iter801.write(oprot) + for iter794 in self.triggers: + iter794.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -20898,11 +20864,11 @@ class SchemaVersion: elif fid == 4: if ftype == TType.LIST: self.cols = [] - (_etype805, _size802) = iprot.readListBegin() - for _i806 in xrange(_size802): - _elem807 = FieldSchema() - _elem807.read(iprot) - self.cols.append(_elem807) + (_etype798, _size795) = iprot.readListBegin() + for _i799 in xrange(_size795): + _elem800 = FieldSchema() + _elem800.read(iprot) + self.cols.append(_elem800) iprot.readListEnd() else: iprot.skip(ftype) @@ -20962,8 +20928,8 @@ class SchemaVersion: if self.cols is not None: oprot.writeFieldBegin('cols', TType.LIST, 4) oprot.writeListBegin(TType.STRUCT, len(self.cols)) - for iter808 in self.cols: - iter808.write(oprot) + for iter801 in self.cols: + iter801.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() if self.state is not None: @@ -21218,11 +21184,11 @@ class FindSchemasByColsResp: if fid == 1: if ftype == TType.LIST: self.schemaVersions = [] - (_etype812, _size809) = iprot.readListBegin() - for _i813 in xrange(_size809): - _elem814 = SchemaVersionDescriptor() - _elem814.read(iprot) - self.schemaVersions.append(_elem814) + (_etype805, _size802) = iprot.readListBegin() + for _i806 in xrange(_size802): + _elem807 = SchemaVersionDescriptor() + _elem807.read(iprot) + self.schemaVersions.append(_elem807) iprot.readListEnd() else: iprot.skip(ftype) @@ -21239,8 +21205,8 @@ class FindSchemasByColsResp: if self.schemaVersions is not None: oprot.writeFieldBegin('schemaVersions', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.schemaVersions)) - for iter815 in self.schemaVersions: - iter815.write(oprot) + for iter808 in self.schemaVersions: + iter808.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop()
http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb index fc640d0..cc77b50 100644 --- a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -3243,13 +3243,15 @@ class CreationMetadata TBLNAME = 3 TABLESUSED = 4 VALIDTXNLIST = 5 + MATERIALIZATIONTIME = 6 FIELDS = { CATNAME => {:type => ::Thrift::Types::STRING, :name => 'catName'}, DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'}, TBLNAME => {:type => ::Thrift::Types::STRING, :name => 'tblName'}, TABLESUSED => {:type => ::Thrift::Types::SET, :name => 'tablesUsed', :element => {:type => ::Thrift::Types::STRING}}, - VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 'validTxnList', :optional => true} + VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 'validTxnList', :optional => true}, + MATERIALIZATIONTIME => {:type => ::Thrift::Types::I64, :name => 'materializationTime', :optional => true} } def struct_fields; FIELDS; end @@ -3870,22 +3872,16 @@ end class Materialization include ::Thrift::Struct, ::Thrift::Struct_Union - TABLESUSED = 1 - VALIDTXNLIST = 2 - INVALIDATIONTIME = 3 - SOURCETABLESUPDATEDELETEMODIFIED = 4 + SOURCETABLESUPDATEDELETEMODIFIED = 1 FIELDS = { - TABLESUSED => {:type => ::Thrift::Types::SET, :name => 'tablesUsed', :element => {:type => ::Thrift::Types::STRING}}, - VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 'validTxnList', :optional => true}, - INVALIDATIONTIME => {:type => ::Thrift::Types::I64, :name => 'invalidationTime', :optional => true}, - SOURCETABLESUPDATEDELETEMODIFIED => {:type => ::Thrift::Types::BOOL, :name => 'sourceTablesUpdateDeleteModified', :optional => true} + SOURCETABLESUPDATEDELETEMODIFIED => {:type => ::Thrift::Types::BOOL, :name => 'sourceTablesUpdateDeleteModified'} } def struct_fields; FIELDS; end def validate - raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field tablesUsed is unset!') unless @tablesUsed + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field sourceTablesUpdateDeleteModified is unset!') if @sourceTablesUpdateDeleteModified.nil? end ::Thrift::Struct.generate_accessors self http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb index bbf3f12..1e1a18f 100644 --- a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb +++ b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb @@ -726,13 +726,13 @@ module ThriftHiveMetastore raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_table_objects_by_name_req failed: unknown result') end - def get_materialization_invalidation_info(dbname, tbl_names) - send_get_materialization_invalidation_info(dbname, tbl_names) + def get_materialization_invalidation_info(creation_metadata, validTxnList) + send_get_materialization_invalidation_info(creation_metadata, validTxnList) return recv_get_materialization_invalidation_info() end - def send_get_materialization_invalidation_info(dbname, tbl_names) - send_message('get_materialization_invalidation_info', Get_materialization_invalidation_info_args, :dbname => dbname, :tbl_names => tbl_names) + def send_get_materialization_invalidation_info(creation_metadata, validTxnList) + send_message('get_materialization_invalidation_info', Get_materialization_invalidation_info_args, :creation_metadata => creation_metadata, :validTxnList => validTxnList) end def recv_get_materialization_invalidation_info() @@ -4028,7 +4028,7 @@ module ThriftHiveMetastore args = read_args(iprot, Get_materialization_invalidation_info_args) result = Get_materialization_invalidation_info_result.new() begin - result.success = @handler.get_materialization_invalidation_info(args.dbname, args.tbl_names) + result.success = @handler.get_materialization_invalidation_info(args.creation_metadata, args.validTxnList) rescue ::MetaException => o1 result.o1 = o1 rescue ::InvalidOperationException => o2 @@ -7632,12 +7632,12 @@ module ThriftHiveMetastore class Get_materialization_invalidation_info_args include ::Thrift::Struct, ::Thrift::Struct_Union - DBNAME = 1 - TBL_NAMES = 2 + CREATION_METADATA = 1 + VALIDTXNLIST = 2 FIELDS = { - DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbname'}, - TBL_NAMES => {:type => ::Thrift::Types::LIST, :name => 'tbl_names', :element => {:type => ::Thrift::Types::STRING}} + CREATION_METADATA => {:type => ::Thrift::Types::STRUCT, :name => 'creation_metadata', :class => ::CreationMetadata}, + VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 'validTxnList'} } def struct_fields; FIELDS; end @@ -7656,7 +7656,7 @@ module ThriftHiveMetastore O3 = 3 FIELDS = { - SUCCESS => {:type => ::Thrift::Types::MAP, :name => 'success', :key => {:type => ::Thrift::Types::STRING}, :value => {:type => ::Thrift::Types::STRUCT, :class => ::Materialization}}, + SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class => ::Materialization}, O1 => {:type => ::Thrift::Types::STRUCT, :name => 'o1', :class => ::MetaException}, O2 => {:type => ::Thrift::Types::STRUCT, :name => 'o2', :class => ::InvalidOperationException}, O3 => {:type => ::Thrift::Types::STRUCT, :name => 'o3', :class => ::UnknownDBException} http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index f3ad723..cd68cbe 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -3053,8 +3053,8 @@ public class HiveMetaStore extends ThriftHiveMetastore { } @Override - public Map<String, Materialization> get_materialization_invalidation_info(final String dbName, final List<String> tableNames) { - return MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(dbName, tableNames); + public Materialization get_materialization_invalidation_info(final CreationMetadata cm, final String validTxnList) throws MetaException { + return getTxnHandler().getMaterializationInvalidationInfo(cm, validTxnList); } @Override @@ -8603,13 +8603,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { @Override public LockResponse get_lock_materialization_rebuild(String dbName, String tableName, long txnId) throws TException { - return MaterializationsRebuildLockHandler.get().lockResource(dbName, tableName, txnId); + return getTxnHandler().lockMaterializationRebuild(dbName, tableName, txnId); } @Override public boolean heartbeat_lock_materialization_rebuild(String dbName, String tableName, long txnId) throws TException { - return MaterializationsRebuildLockHandler.get().refreshLockResource(dbName, tableName, txnId); + return getTxnHandler().heartbeatLockMaterializationRebuild(dbName, tableName, txnId); } @Override @@ -8925,8 +8925,6 @@ public class HiveMetaStore extends ThriftHiveMetastore { false); IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf); - // Initialize materializations invalidation cache - MaterializationsInvalidationCache.get().init(conf, handler); TServerSocket serverSocket; if (useSasl) { http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 0eb7f1a..2a14dd4 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -167,8 +167,6 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { // instantiate the metastore server handler directly instead of connecting // through the network client = HiveMetaStore.newRetryingHMSHandler("hive client", this.conf, true); - // Initialize materializations invalidation cache (only for local metastore) - MaterializationsInvalidationCache.get().init(conf, (IHMSHandler) client); isConnected = true; snapshotActiveConf(); return; @@ -1599,10 +1597,9 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { } @Override - public Map<String, Materialization> getMaterializationsInvalidationInfo(String dbName, List<String> viewNames) + public Materialization getMaterializationInvalidationInfo(CreationMetadata cm, String validTxnList) throws MetaException, InvalidOperationException, UnknownDBException, TException { - return client.get_materialization_invalidation_info( - dbName, filterHook.filterTableNames(getDefaultCatalog(conf), dbName, viewNames)); + return client.get_materialization_invalidation_info(cm, validTxnList); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index bc09076..234e0cf 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -773,7 +773,7 @@ public interface IMetaStoreClient { /** * Returns the invalidation information for the materialized views given as input. */ - Map<String, Materialization> getMaterializationsInvalidationInfo(String dbName, List<String> viewNames) + Materialization getMaterializationInvalidationInfo(CreationMetadata cm, String validTxnList) throws MetaException, InvalidOperationException, UnknownDBException, TException; /** http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java deleted file mode 100644 index cc168a9..0000000 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; - -/** - * Task responsible for cleaning the transactions that are not useful from the - * materializations cache. - */ -public class MaterializationsCacheCleanerTask implements MetastoreTaskThread { - private static final Logger LOG = LoggerFactory.getLogger(MaterializationsCacheCleanerTask.class); - - private Configuration conf; - - @Override - public long runFrequency(TimeUnit unit) { - return MetastoreConf.getTimeVar(conf, - MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_CLEAN_FREQUENCY, unit); - } - - @Override - public void setConf(Configuration configuration) { - conf = configuration; - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void run() { - long removedCnt = MaterializationsInvalidationCache.get().cleanup(System.currentTimeMillis() - - MetastoreConf.getTimeVar(conf, - MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_EXPIRY_DURATION, TimeUnit.MILLISECONDS)); - if (removedCnt > 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Number of transaction entries deleted from materializations cache: " + removedCnt); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java deleted file mode 100644 index fc644f0..0000000 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java +++ /dev/null @@ -1,543 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore; - -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import org.apache.hadoop.conf.Configuration; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import org.apache.hadoop.hive.common.ValidTxnWriteIdList; -import org.apache.hadoop.hive.common.ValidWriteIdList; -import org.apache.hadoop.hive.metastore.api.BasicTxnInfo; -import org.apache.hadoop.hive.metastore.api.LockResponse; -import org.apache.hadoop.hive.metastore.api.Materialization; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; - -/** - * This cache keeps information in memory about the table modifications so materialized views - * can verify their invalidation time, i.e., the moment after materialization on which the - * first transaction to the tables they used happened. This information is kept in memory - * to check the invalidation quickly. However, we store enough information in the metastore - * to bring this cache up if the metastore is restarted or would crashed. This cache lives - * in the metastore server. - */ -public final class MaterializationsInvalidationCache { - - private static final Logger LOG = LoggerFactory.getLogger(MaterializationsInvalidationCache.class); - - /* Singleton */ - private static final MaterializationsInvalidationCache SINGLETON = new MaterializationsInvalidationCache(); - - /* If this boolean is true, this class has no functionality. Only for debugging purposes. */ - private boolean disable; - - /* Key is the database name. Each value is a map from the unique view qualified name to - * the materialization invalidation info. This invalidation object contains information - * such as the tables used by the materialized view, whether there was any update or - * delete in the source tables since the materialized view was created or rebuilt, - * or the invalidation time, i.e., first modification of the tables used by materialized - * view after the view was created. */ - private final ConcurrentMap<String, ConcurrentMap<String, Materialization>> materializations = - new ConcurrentHashMap<>(); - - /* - * Key is a qualified table name. The value is a (sorted) tree map (supporting concurrent - * modifications) that will keep the modifications for a given table in the order of their - * transaction id. This is useful to quickly check the invalidation time for a given - * materialization. - */ - private final ConcurrentMap<String, ConcurrentSkipListMap<Long, Long>> tableModifications = - new ConcurrentHashMap<>(); - - private final ConcurrentMap<String, ConcurrentSkipListSet<Long>> updateDeleteTableModifications = - new ConcurrentHashMap<>(); - - /* Whether the cache has been initialized or not. */ - private boolean initialized; - /* Configuration for cache. */ - private Configuration conf; - /* Handler to connect to metastore. */ - private IHMSHandler handler; - - private MaterializationsInvalidationCache() { - } - - /** - * Get instance of MaterializationsInvalidationCache. - * - * @return the singleton - */ - public static MaterializationsInvalidationCache get() { - return SINGLETON; - } - - /** - * Initialize the invalidation cache. - * - * The method is synchronized because we want to avoid initializing the invalidation cache - * multiple times in embedded mode. This will not happen when we run the metastore remotely - * as the method is called only once. - */ - public synchronized void init(Configuration conf, IHMSHandler handler) { - this.conf = conf; - this.handler = handler; - - // This will only be true for debugging purposes - this.disable = MetastoreConf.getVar(conf, - MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_IMPL).equals("DISABLE"); - if (disable) { - // Nothing to do - return; - } - - if (!initialized) { - this.initialized = true; - ExecutorService pool = Executors.newCachedThreadPool(); - pool.submit(new Loader()); - pool.shutdown(); - } - } - - private class Loader implements Runnable { - @Override - public void run() { - try { - RawStore store = handler.getMS(); - for (String catName : store.getCatalogs()) { - for (String dbName : store.getAllDatabases(catName)) { - for (Table mv : store.getTableObjectsByName(catName, dbName, - store.getTables(catName, dbName, null, TableType.MATERIALIZED_VIEW))) { - addMaterializedView(mv.getDbName(), mv.getTableName(), ImmutableSet.copyOf(mv.getCreationMetadata().getTablesUsed()), - mv.getCreationMetadata().getValidTxnList(), OpType.LOAD); - } - } - } - LOG.info("Initialized materializations invalidation cache"); - } catch (Exception e) { - LOG.error("Problem connecting to the metastore when initializing the view registry"); - } - } - } - - /** - * Adds a newly created materialized view to the cache. - * - * @param dbName - * @param tableName - * @param tablesUsed tables used by the materialized view - * @param validTxnList - */ - public void createMaterializedView(String dbName, String tableName, Set<String> tablesUsed, - String validTxnList) { - addMaterializedView(dbName, tableName, tablesUsed, validTxnList, OpType.CREATE); - } - - /** - * Method to call when materialized view is modified. - * - * @param dbName - * @param tableName - * @param tablesUsed tables used by the materialized view - * @param validTxnList - */ - public void alterMaterializedView(String dbName, String tableName, Set<String> tablesUsed, - String validTxnList) { - addMaterializedView(dbName, tableName, tablesUsed, validTxnList, OpType.ALTER); - } - - /** - * Adds the materialized view to the cache. - * - * @param dbName - * @param tableName - * @param tablesUsed tables used by the materialized view - * @param validTxnList - * @param opType - */ - private void addMaterializedView(String dbName, String tableName, Set<String> tablesUsed, - String validTxnList, OpType opType) { - if (disable) { - // Nothing to do - return; - } - // We are going to create the map for each view in the given database - ConcurrentMap<String, Materialization> cq = - new ConcurrentHashMap<String, Materialization>(); - final ConcurrentMap<String, Materialization> prevCq = materializations.putIfAbsent( - dbName, cq); - if (prevCq != null) { - cq = prevCq; - } - // Start the process to add materialization to the cache - // Before loading the materialization in the cache, we need to update some - // important information in the registry to account for rewriting invalidation - if (validTxnList == null) { - // This can happen when the materialized view was created on non-transactional tables - return; - } - if (opType == OpType.CREATE || opType == OpType.ALTER) { - // You store the materialized view - Materialization materialization = new Materialization(tablesUsed); - materialization.setValidTxnList(validTxnList); - cq.put(tableName, materialization); - } else { - ValidTxnWriteIdList txnList = new ValidTxnWriteIdList(validTxnList); - for (String qNameTableUsed : tablesUsed) { - ValidWriteIdList tableTxnList = txnList.getTableValidWriteIdList(qNameTableUsed); - // First we insert a new tree set to keep table modifications, unless it already exists - ConcurrentSkipListMap<Long, Long> modificationsTree = new ConcurrentSkipListMap<>(); - final ConcurrentSkipListMap<Long, Long> prevModificationsTree = tableModifications.putIfAbsent( - qNameTableUsed, modificationsTree); - if (prevModificationsTree != null) { - modificationsTree = prevModificationsTree; - } - // If we are not creating the MV at this instant, but instead it was created previously - // and we are loading it into the cache, we need to go through the transaction entries and - // check if the MV is still valid. - try { - String[] names = qNameTableUsed.split("\\."); - BasicTxnInfo e = handler.getTxnHandler().getFirstCompletedTransactionForTableAfterCommit( - names[0], names[1], tableTxnList); - if (!e.isIsnull()) { - modificationsTree.put(e.getTxnid(), e.getTime()); - // We do not need to do anything more for current table, as we detected - // a modification event that was in the metastore. - continue; - } - } catch (MetaException ex) { - LOG.debug("Materialized view " + Warehouse.getQualifiedName(dbName, tableName) + - " ignored; error loading view into invalidation cache", ex); - return; - } - } - // For LOAD, you only add it if it does exist as you might be loading an outdated MV - Materialization materialization = new Materialization(tablesUsed); - materialization.setValidTxnList(validTxnList); - cq.putIfAbsent(tableName, materialization); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Cached materialized view for rewriting in invalidation cache: " + - Warehouse.getQualifiedName(dbName, tableName)); - } - } - - /** - * This method is called when a table is modified. That way we can keep track of the - * invalidation for the MVs that use that table. - */ - public void notifyTableModification(String dbName, String tableName, - long txnId, long newModificationTime, boolean isUpdateDelete) { - if (disable) { - // Nothing to do - return; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Notification for table {} in database {} received -> id: {}, time: {}", - tableName, dbName, txnId, newModificationTime); - } - if (isUpdateDelete) { - // We update first the update/delete modifications record - ConcurrentSkipListSet<Long> modificationsSet = new ConcurrentSkipListSet<>(); - final ConcurrentSkipListSet<Long> prevModificationsSet = - updateDeleteTableModifications.putIfAbsent(Warehouse.getQualifiedName(dbName, tableName), - modificationsSet); - if (prevModificationsSet != null) { - modificationsSet = prevModificationsSet; - } - modificationsSet.add(txnId); - } - ConcurrentSkipListMap<Long, Long> modificationsTree = new ConcurrentSkipListMap<>(); - final ConcurrentSkipListMap<Long, Long> prevModificationsTree = - tableModifications.putIfAbsent(Warehouse.getQualifiedName(dbName, tableName), modificationsTree); - if (prevModificationsTree != null) { - modificationsTree = prevModificationsTree; - } - modificationsTree.put(txnId, newModificationTime); - } - - /** - * Removes the materialized view from the cache. - * - * @param dbName - * @param tableName - */ - public void dropMaterializedView(String dbName, String tableName) { - if (disable) { - // Nothing to do - return; - } - materializations.get(dbName).remove(tableName); - } - - /** - * Returns the materialized views in the cache for the given database. - * - * @param dbName the database - * @return the collection of materialized views, or the empty collection if none - */ - public Map<String, Materialization> getMaterializationInvalidationInfo( - String dbName, List<String> materializationNames) { - if (materializations.get(dbName) != null) { - ImmutableMap.Builder<String, Materialization> m = ImmutableMap.builder(); - for (String materializationName : materializationNames) { - Materialization materialization = - materializations.get(dbName).get(materializationName); - if (materialization == null) { - LOG.debug("Materialization {} skipped as there is no information " - + "in the invalidation cache about it", materializationName); - continue; - } - // We create a deep copy of the materialization, as we need to set the time - // and whether any update/delete operation happen on the tables that it uses - // since it was created. - Materialization materializationCopy = new Materialization( - materialization.getTablesUsed()); - materializationCopy.setValidTxnList(materialization.getValidTxnList()); - enrichWithInvalidationInfo(materializationCopy); - m.put(materializationName, materializationCopy); - } - Map<String, Materialization> result = m.build(); - if (LOG.isDebugEnabled()) { - LOG.debug("Retrieved the following materializations from the invalidation cache: {}", result); - } - return result; - } - return ImmutableMap.of(); - } - - private void enrichWithInvalidationInfo(Materialization materialization) { - String materializationTxnListString = materialization.getValidTxnList(); - if (materializationTxnListString == null) { - // This can happen when the materialization was created on non-transactional tables - materialization.setInvalidationTime(Long.MIN_VALUE); - return; - } - - // We will obtain the modification time as follows. - // First, we obtain the first element after high watermark (if any) - // Then, we iterate through the elements from min open txn till high - // watermark, updating the modification time after creation if needed - ValidTxnWriteIdList materializationTxnList = new ValidTxnWriteIdList(materializationTxnListString); - long firstModificationTimeAfterCreation = 0L; - boolean containsUpdateDelete = false; - for (String qNameTableUsed : materialization.getTablesUsed()) { - final ValidWriteIdList tableMaterializationTxnList = - materializationTxnList.getTableValidWriteIdList(qNameTableUsed); - - final ConcurrentSkipListMap<Long, Long> usedTableModifications = - tableModifications.get(qNameTableUsed); - if (usedTableModifications == null) { - // This is not necessarily an error, since the table may be empty. To be safe, - // instead of including this materialized view, we just log the information and - // skip it (if table is really empty, it will not matter for performance anyway). - LOG.warn("No information found in invalidation cache for table {}, possible tables are: {}", - qNameTableUsed, tableModifications.keySet()); - materialization.setInvalidationTime(Long.MIN_VALUE); - return; - } - final ConcurrentSkipListSet<Long> usedUDTableModifications = - updateDeleteTableModifications.get(qNameTableUsed); - final Entry<Long, Long> tn = usedTableModifications.higherEntry(tableMaterializationTxnList.getHighWatermark()); - if (tn != null) { - if (firstModificationTimeAfterCreation == 0L || - tn.getValue() < firstModificationTimeAfterCreation) { - firstModificationTimeAfterCreation = tn.getValue(); - } - // Check if there was any update/delete after creation - containsUpdateDelete = usedUDTableModifications != null && - !usedUDTableModifications.tailSet(tableMaterializationTxnList.getHighWatermark(), false).isEmpty(); - } - // Min open txn might be null if there were no open transactions - // when this transaction was being executed - if (tableMaterializationTxnList.getMinOpenWriteId() != null) { - // Invalid transaction list is sorted - int pos = 0; - for (Map.Entry<Long, Long> t : usedTableModifications - .subMap(tableMaterializationTxnList.getMinOpenWriteId(), tableMaterializationTxnList.getHighWatermark()).entrySet()) { - while (pos < tableMaterializationTxnList.getInvalidWriteIds().length && - tableMaterializationTxnList.getInvalidWriteIds()[pos] != t.getKey()) { - pos++; - } - if (pos >= tableMaterializationTxnList.getInvalidWriteIds().length) { - break; - } - if (firstModificationTimeAfterCreation == 0L || - t.getValue() < firstModificationTimeAfterCreation) { - firstModificationTimeAfterCreation = t.getValue(); - } - containsUpdateDelete = containsUpdateDelete || - (usedUDTableModifications != null && usedUDTableModifications.contains(t.getKey())); - } - } - } - - materialization.setInvalidationTime(firstModificationTimeAfterCreation); - materialization.setSourceTablesUpdateDeleteModified(containsUpdateDelete); - } - - private enum OpType { - CREATE, - LOAD, - ALTER - } - - /** - * Removes transaction events that are not relevant anymore. - * @param minTime events generated before this time (ms) can be deleted from the cache - * @return number of events that were deleted from the cache - */ - public long cleanup(long minTime) { - // To remove, mv should meet two conditions: - // 1) Current time - time of transaction > config parameter, and - // 2) Transaction should not be associated with invalidation of a MV - if (disable || !initialized) { - // Bail out - return 0L; - } - // We execute the cleanup in two steps - // First we gather all the transactions that need to be kept - final Multimap<String, Long> keepTxnInfos = HashMultimap.create(); - for (Map.Entry<String, ConcurrentMap<String, Materialization>> e : materializations.entrySet()) { - for (Materialization m : e.getValue().values()) { - ValidTxnWriteIdList txnList = new ValidTxnWriteIdList(m.getValidTxnList()); - boolean canBeDeleted = false; - String currentTableForInvalidatingTxn = null; - long currentInvalidatingTxnId = 0L; - long currentInvalidatingTxnTime = 0L; - for (String qNameTableUsed : m.getTablesUsed()) { - ValidWriteIdList tableTxnList = txnList.getTableValidWriteIdList(qNameTableUsed); - final Entry<Long, Long> tn = tableModifications.get(qNameTableUsed) - .higherEntry(tableTxnList.getHighWatermark()); - if (tn != null) { - if (currentInvalidatingTxnTime == 0L || - tn.getValue() < currentInvalidatingTxnTime) { - // This transaction 1) is the first one examined for this materialization, or - // 2) it is the invalidating transaction. Hence we add it to the transactions to keep. - // 1.- We remove the previous invalidating transaction from the transactions - // to be kept (if needed). - if (canBeDeleted && currentInvalidatingTxnTime < minTime) { - keepTxnInfos.remove(currentTableForInvalidatingTxn, currentInvalidatingTxnId); - } - // 2.- We add this transaction to the transactions that should be kept. - canBeDeleted = !keepTxnInfos.get(qNameTableUsed).contains(tn.getKey()); - keepTxnInfos.put(qNameTableUsed, tn.getKey()); - // 3.- We record this transaction as the current invalidating transaction. - currentTableForInvalidatingTxn = qNameTableUsed; - currentInvalidatingTxnId = tn.getKey(); - currentInvalidatingTxnTime = tn.getValue(); - } - } - if (tableTxnList.getMinOpenWriteId() != null) { - // Invalid transaction list is sorted - int pos = 0; - for (Entry<Long, Long> t : tableModifications.get(qNameTableUsed) - .subMap(tableTxnList.getMinOpenWriteId(), tableTxnList.getHighWatermark()).entrySet()) { - while (pos < tableTxnList.getInvalidWriteIds().length && - tableTxnList.getInvalidWriteIds()[pos] != t.getKey()) { - pos++; - } - if (pos >= tableTxnList.getInvalidWriteIds().length) { - break; - } - if (currentInvalidatingTxnTime == 0L || - t.getValue() < currentInvalidatingTxnTime) { - // This transaction 1) is the first one examined for this materialization, or - // 2) it is the invalidating transaction. Hence we add it to the transactions to keep. - // 1.- We remove the previous invalidating transaction from the transactions - // to be kept (if needed). - if (canBeDeleted && currentInvalidatingTxnTime < minTime) { - keepTxnInfos.remove(currentTableForInvalidatingTxn, currentInvalidatingTxnId); - } - // 2.- We add this transaction to the transactions that should be kept. - canBeDeleted = !keepTxnInfos.get(qNameTableUsed).contains(t.getKey()); - keepTxnInfos.put(qNameTableUsed, t.getKey()); - // 3.- We record this transaction as the current invalidating transaction. - currentTableForInvalidatingTxn = qNameTableUsed; - currentInvalidatingTxnId = t.getKey(); - currentInvalidatingTxnTime = t.getValue(); - } - } - } - } - } - } - // Second, we remove the transactions - long removed = 0L; - for (Entry<String, ConcurrentSkipListMap<Long, Long>> e : tableModifications.entrySet()) { - Collection<Long> c = keepTxnInfos.get(e.getKey()); - ConcurrentSkipListSet<Long> updateDeleteForTable = updateDeleteTableModifications.get(e.getKey()); - for (Iterator<Entry<Long, Long>> it = e.getValue().entrySet().iterator(); it.hasNext();) { - Entry<Long, Long> v = it.next(); - // We need to check again the time because some of the transactions might not be explored - // above, e.g., transactions above the highest transaction mark for all the materialized - // views. - if (v.getValue() < minTime && (c.isEmpty() || !c.contains(v.getKey()))) { - if (LOG.isDebugEnabled()) { - LOG.debug("Transaction removed from cache for table {} -> id: {}, time: {}", - e.getKey(), v.getKey(), v.getValue()); - } - if (updateDeleteForTable != null) { - updateDeleteForTable.remove(v.getKey()); - } - it.remove(); - removed++; - } - } - } - return removed; - } - - /** - * Checks whether the given materialization exists in the invalidation cache. - * @param dbName the database name for the materialization - * @param tblName the table name for the materialization - * @return true if we have information about the materialization in the cache, - * false otherwise - */ - public boolean containsMaterialization(String dbName, String tblName) { - if (disable || dbName == null || tblName == null) { - return false; - } - ConcurrentMap<String, Materialization> dbMaterializations = materializations.get(dbName); - if (dbMaterializations == null || dbMaterializations.get(tblName) == null) { - // This is a table - return false; - } - return true; - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java index 8ca9ede..9ce7d6d 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java @@ -18,7 +18,10 @@ package org.apache.hadoop.hive.metastore; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +35,7 @@ public class MaterializationsRebuildLockCleanerTask implements MetastoreTaskThre private static final Logger LOG = LoggerFactory.getLogger(MaterializationsRebuildLockCleanerTask.class); private Configuration conf; + private TxnStore txnHandler; @Override public long runFrequency(TimeUnit unit) { @@ -41,6 +45,7 @@ public class MaterializationsRebuildLockCleanerTask implements MetastoreTaskThre @Override public void setConf(Configuration configuration) { conf = configuration; + txnHandler = TxnUtils.getTxnStore(conf); } @Override @@ -50,11 +55,26 @@ public class MaterializationsRebuildLockCleanerTask implements MetastoreTaskThre @Override public void run() { - long removedCnt = MaterializationsRebuildLockHandler.get().cleanupResourceLocks( - MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS)); - if (removedCnt > 0) { - if (LOG.isDebugEnabled()) { - LOG.info("Number of materialization locks deleted: " + removedCnt); + if (LOG.isDebugEnabled()) { + LOG.debug("Cleaning up materialization rebuild locks"); + } + + TxnStore.MutexAPI.LockHandle handle = null; + try { + handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.MaterializationRebuild.name()); + ValidTxnList validTxnList = TxnUtils.createValidReadTxnList(txnHandler.getOpenTxns(), 0); + long removedCnt = txnHandler.cleanupMaterializationRebuildLocks(validTxnList, + MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS)); + if (removedCnt > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Number of materialization locks deleted: " + removedCnt); + } + } + } catch(Throwable t) { + LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t); + } finally { + if(handle != null) { + handle.releaseLocks(); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 3e186b7..fdadf12 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -1327,13 +1327,6 @@ public class ObjectStore implements RawStore, Configurable { } finally { if (!commited) { rollbackTransaction(); - } else { - if (MetaStoreUtils.isMaterializedViewTable(tbl)) { - // Add to the invalidation cache - MaterializationsInvalidationCache.get().createMaterializedView( - tbl.getDbName(), tbl.getTableName(), tbl.getCreationMetadata().getTablesUsed(), - tbl.getCreationMetadata().getValidTxnList()); - } } } } @@ -1431,10 +1424,6 @@ public class ObjectStore implements RawStore, Configurable { } finally { if (!success) { rollbackTransaction(); - } else { - if (materializedView) { - MaterializationsInvalidationCache.get().dropMaterializedView(dbName, tableName); - } } } return success; @@ -2278,13 +2267,14 @@ public class ObjectStore implements RawStore, Configurable { if (m == null) { return null; } + assert !m.isSetMaterializationTime(); Set<MTable> tablesUsed = new HashSet<>(); for (String fullyQualifiedName : m.getTablesUsed()) { String[] names = fullyQualifiedName.split("\\."); tablesUsed.add(getMTable(m.getCatName(), names[0], names[1], false).mtbl); } return new MCreationMetadata(m.getCatName(), m.getDbName(), m.getTblName(), - tablesUsed, m.getValidTxnList()); + tablesUsed, m.getValidTxnList(), System.currentTimeMillis()); } private CreationMetadata convertToCreationMetadata( @@ -2300,6 +2290,7 @@ public class ObjectStore implements RawStore, Configurable { } CreationMetadata r = new CreationMetadata(s.getCatalogName(), s.getDbName(), s.getTblName(), tablesUsed); + r.setMaterializationTime(s.getMaterializationTime()); if (s.getTxnList() != null) { r.setValidTxnList(s.getTxnList()); } @@ -4075,16 +4066,13 @@ public class ObjectStore implements RawStore, Configurable { MCreationMetadata newMcm = convertToMCreationMetadata(cm); MCreationMetadata mcm = getCreationMetadata(catName, dbname, tablename); mcm.setTables(newMcm.getTables()); + mcm.setMaterializationTime(newMcm.getMaterializationTime()); mcm.setTxnList(newMcm.getTxnList()); // commit the changes success = commitTransaction(); } finally { if (!success) { rollbackTransaction(); - } else { - // Add to the invalidation cache if the creation signature has changed - MaterializationsInvalidationCache.get().alterMaterializedView( - dbname, tablename, cm.getTablesUsed(), cm.getValidTxnList()); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 99d38e2..d5f9721 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.DefaultStorageSchemaReader; import org.apache.hadoop.hive.metastore.HiveAlterHandler; -import org.apache.hadoop.hive.metastore.MaterializationsCacheCleanerTask; import org.apache.hadoop.hive.metastore.MaterializationsRebuildLockCleanerTask; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.RuntimeStatsCleanerTask; @@ -760,7 +759,6 @@ public class MetastoreConf { TASK_THREADS_ALWAYS("metastore.task.threads.always", "metastore.task.threads.always", EventCleanerTask.class.getName() + "," + RuntimeStatsCleanerTask.class.getName() + "," + "org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask" + "," + - MaterializationsCacheCleanerTask.class.getName() + "," + MaterializationsRebuildLockCleanerTask.class.getName() + "," + RuntimeStatsCleanerTask.class.getName(), "Comma separated list of tasks that will be started in separate threads. These will " + "always be started, regardless of whether the metastore is running in embedded mode " + http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java index 66b5d48..2d65126 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java @@ -22,8 +22,8 @@ import java.util.Set; /** * Represents the creation metadata of a materialization. * It includes the database and table name for the materialization, - * the set of tables that it uses, and the valid transaction list - * when it was created. + * the set of tables that it uses, the valid transaction list + * when it was created, and the creation/rebuild time. */ public class MCreationMetadata { @@ -32,17 +32,19 @@ public class MCreationMetadata { private String tblName; private Set<MTable> tables; private String txnList; + private long materializationTime; public MCreationMetadata() { } public MCreationMetadata(String catName, String dbName, String tblName, - Set<MTable> tables, String txnList) { + Set<MTable> tables, String txnList, long materializationTime) { this.catalogName = catName; this.dbName = dbName; this.tblName = tblName; this.tables = tables; this.txnList = txnList; + this.materializationTime = materializationTime; } public Set<MTable> getTables() { @@ -84,4 +86,12 @@ public class MCreationMetadata { public void setTblName(String tblName) { this.tblName = tblName; } + + public long getMaterializationTime() { + return materializationTime; + } + + public void setMaterializationTime(long materializationTime) { + this.materializationTime = materializationTime; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 50bfca3..8764c21 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -94,9 +94,9 @@ public final class TxnDbUtil { " CTC_DATABASE varchar(128) NOT NULL," + " CTC_TABLE varchar(128)," + " CTC_PARTITION varchar(767)," + - " CTC_ID bigint GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) NOT NULL," + " CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL," + - " CTC_WRITEID bigint)"); + " CTC_WRITEID bigint," + + " CTC_UPDATE_DELETE char(1) NOT NULL)"); stmt.execute("CREATE TABLE NEXT_TXN_ID (" + " NTXN_NEXT bigint NOT NULL)"); stmt.execute("INSERT INTO NEXT_TXN_ID VALUES(1)"); @@ -194,6 +194,14 @@ public final class TxnDbUtil { " PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID))" ); + stmt.execute("CREATE TABLE MATERIALIZATION_REBUILD_LOCKS (" + + " MRL_TXN_ID BIGINT NOT NULL, " + + " MRL_DB_NAME VARCHAR(128) NOT NULL, " + + " MRL_TBL_NAME VARCHAR(256) NOT NULL, " + + " MRL_LAST_HEARTBEAT BIGINT NOT NULL, " + + " PRIMARY KEY(MRL_TXN_ID))" + ); + try { stmt.execute("CREATE TABLE \"APP\".\"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\" VARCHAR(256) NOT " + http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index e2a2a39..b2a22f1 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -26,10 +26,10 @@ import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.sql.Savepoint; import java.sql.Statement; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; -import java.util.Calendar; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -40,7 +40,6 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.SortedSet; -import java.util.TimeZone; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; @@ -64,11 +63,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.metastore.DatabaseProduct; -import org.apache.hadoop.hive.metastore.MaterializationsInvalidationCache; -import org.apache.hadoop.hive.metastore.MaterializationsRebuildLockHandler; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier; import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener; @@ -842,10 +840,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { @RetrySemantics.Idempotent("No-op if already committed") public void commitTxn(CommitTxnRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { - MaterializationsRebuildLockHandler materializationsRebuildLockHandler = - MaterializationsRebuildLockHandler.get(); - List<TransactionRegistryInfo> txnComponents = new ArrayList<>(); - boolean isUpdateDelete = false; + char isUpdateDelete = 'N'; long txnid = rqst.getTxnid(); long sourceTxnId = -1; @@ -902,7 +897,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")"; rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, "tc_operation_type " + conflictSQLSuffix)); if (rs.next()) { - isUpdateDelete = true; + isUpdateDelete = 'Y'; close(rs); //if here it means currently committing txn performed update/delete and we should check WW conflict /** @@ -995,8 +990,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { // Move the record from txn_components into completed_txn_components so that the compactor // knows where to look to compact. String s = "insert into COMPLETED_TXN_COMPONENTS (ctc_txnid, ctc_database, " + - "ctc_table, ctc_partition, ctc_writeid) select tc_txnid, tc_database, tc_table, " + - "tc_partition, tc_writeid from TXN_COMPONENTS where tc_txnid = " + txnid; + "ctc_table, ctc_partition, ctc_writeid, ctc_update_delete) select tc_txnid, tc_database, tc_table, " + + "tc_partition, tc_writeid, '" + isUpdateDelete + "' from TXN_COMPONENTS where tc_txnid = " + txnid; LOG.debug("Going to execute insert <" + s + ">"); int modCount = 0; if ((modCount = stmt.executeUpdate(s)) < 1) { @@ -1005,15 +1000,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { LOG.info("Expected to move at least one record from txn_components to " + "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid)); } - // Obtain information that we need to update registry - s = "select ctc_database, ctc_table, ctc_writeid, ctc_timestamp from COMPLETED_TXN_COMPONENTS where ctc_txnid = " + txnid; - LOG.debug("Going to extract table modification information for invalidation cache <" + s + ">"); - rs = stmt.executeQuery(s); - while (rs.next()) { - // We only enter in this loop if the transaction actually affected any table - txnComponents.add(new TransactionRegistryInfo(rs.getString(1), rs.getString(2), - rs.getLong(3), rs.getTimestamp(4, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime())); - } s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid; LOG.debug("Going to execute update <" + s + ">"); modCount = stmt.executeUpdate(s); @@ -1028,6 +1014,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { modCount = stmt.executeUpdate(s); LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL"); + s = "delete from MATERIALIZATION_REBUILD_LOCKS where mrl_txn_id = " + txnid; + LOG.debug("Going to execute update <" + s + ">"); + modCount = stmt.executeUpdate(s); + if (rqst.isSetReplPolicy()) { s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId + " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy()); @@ -1040,24 +1030,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, null), dbConn, sqlGenerator); } - MaterializationsInvalidationCache materializationsInvalidationCache = - MaterializationsInvalidationCache.get(); - for (TransactionRegistryInfo info : txnComponents) { - if (materializationsInvalidationCache.containsMaterialization(info.dbName, info.tblName) && - !materializationsRebuildLockHandler.readyToCommitResource(info.dbName, info.tblName, txnid)) { - throw new MetaException( - "Another process is rebuilding the materialized view " + info.fullyQualifiedName); - } - } LOG.debug("Going to commit"); close(rs); dbConn.commit(); - - // Update registry with modifications - for (TransactionRegistryInfo info : txnComponents) { - materializationsInvalidationCache.notifyTableModification( - info.dbName, info.tblName, info.writeId, info.timestamp, isUpdateDelete); - } } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); @@ -1068,9 +1043,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { close(commitIdRs); close(lockHandle, stmt, dbConn); unlockInternal(); - for (TransactionRegistryInfo info : txnComponents) { - materializationsRebuildLockHandler.unlockResource(info.dbName, info.tblName, txnid); - } } } catch (RetryException e) { commitTxn(rqst); @@ -1600,16 +1572,30 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } /** - * Gets the information of the first transaction for the given table - * after the transaction with the input id was committed (if any). + * Get invalidation info for the materialization. Currently, the materialization information + * only contains information about whether there was update/delete operations on the source + * tables used by the materialization since it was created. */ @Override @RetrySemantics.ReadOnly - public BasicTxnInfo getFirstCompletedTransactionForTableAfterCommit( - String inputDbName, String inputTableName, ValidWriteIdList txnList) - throws MetaException { - final List<Long> openTxns = Arrays.asList(ArrayUtils.toObject(txnList.getInvalidWriteIds())); + public Materialization getMaterializationInvalidationInfo( + CreationMetadata creationMetadata, String validTxnListStr) throws MetaException { + if (creationMetadata.getTablesUsed().isEmpty()) { + // Bail out + LOG.warn("Materialization creation metadata does not contain any table"); + return null; + } + + // Parse validTxnList + final ValidReadTxnList validTxnList = + new ValidReadTxnList(validTxnListStr); + // Parse validReaderWriteIdList from creation metadata + final ValidTxnWriteIdList validReaderWriteIdList = + new ValidTxnWriteIdList(creationMetadata.getValidTxnList()); + + // We are composing a query that returns a single row if an update happened after + // the materialization was created. Otherwise, query returns 0 rows. Connection dbConn = null; Statement stmt = null; ResultSet rs = null; @@ -1617,32 +1603,207 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); stmt.setMaxRows(1); - String s = "select ctc_timestamp, ctc_writeid, ctc_database, ctc_table " - + "from COMPLETED_TXN_COMPONENTS " - + "where ctc_database=" + quoteString(inputDbName) + " and ctc_table=" + quoteString(inputTableName) - + " and ctc_writeid > " + txnList.getHighWatermark() - + (txnList.getInvalidWriteIds().length == 0 ? - " " : " or ctc_writeid IN(" + StringUtils.join(",", openTxns) + ") ") - + "order by ctc_timestamp asc"; + StringBuilder query = new StringBuilder(); + // compose a query that select transactions containing an update... + query.append("select ctc_update_delete from COMPLETED_TXN_COMPONENTS where ctc_update_delete='Y' AND ("); + int i = 0; + for (String fullyQualifiedName : creationMetadata.getTablesUsed()) { + // ...for each of the tables that are part of the materialized view, + // where the transaction had to be committed after the materialization was created... + if (i != 0) { + query.append("OR"); + } + String[] names = TxnUtils.getDbTableName(fullyQualifiedName); + query.append(" (ctc_database=" + quoteString(names[0]) + " AND ctc_table=" + quoteString(names[1])); + ValidWriteIdList tblValidWriteIdList = + validReaderWriteIdList.getTableValidWriteIdList(fullyQualifiedName); + if (tblValidWriteIdList == null) { + LOG.warn("ValidWriteIdList for table {} not present in creation metadata, this should not happen"); + return null; + } + query.append(" AND (ctc_writeid > " + tblValidWriteIdList.getHighWatermark()); + query.append(tblValidWriteIdList.getInvalidWriteIds().length == 0 ? ") " : + " OR ctc_writeid IN(" + StringUtils.join(",", + Arrays.asList(ArrayUtils.toObject(tblValidWriteIdList.getInvalidWriteIds()))) + ") "); + query.append(") "); + i++; + } + // ... and where the transaction has already been committed as per snapshot taken + // when we are running current query + query.append(") AND ctc_txnid <= " + validTxnList.getHighWatermark()); + query.append(validTxnList.getInvalidTransactions().length == 0 ? " " : + " AND ctc_txnid NOT IN(" + StringUtils.join(",", + Arrays.asList(ArrayUtils.toObject(validTxnList.getInvalidTransactions()))) + ") "); + + // Execute query + String s = query.toString(); if (LOG.isDebugEnabled()) { LOG.debug("Going to execute query <" + s + ">"); } rs = stmt.executeQuery(s); - if(!rs.next()) { - return new BasicTxnInfo(true); + return new Materialization(rs.next()); + } catch (SQLException ex) { + LOG.warn("getMaterializationInvalidationInfo failed due to " + getMessage(ex), ex); + throw new MetaException("Unable to retrieve materialization invalidation information due to " + + StringUtils.stringifyException(ex)); + } finally { + close(rs, stmt, dbConn); + } + } + + @Override + public LockResponse lockMaterializationRebuild(String dbName, String tableName, long txnId) + throws MetaException { + + if (LOG.isDebugEnabled()) { + LOG.debug("Acquiring lock for materialization rebuild with txnId={} for {}", txnId, Warehouse.getQualifiedName(dbName,tableName)); + } + + TxnStore.MutexAPI.LockHandle handle = null; + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + lockInternal(); + /** + * MUTEX_KEY.MaterializationRebuild lock ensures that there is only 1 entry in + * Initiated/Working state for any resource. This ensures we do not run concurrent + * rebuild operations on any materialization. + */ + handle = getMutexAPI().acquireLock(MUTEX_KEY.MaterializationRebuild.name()); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + + String selectQ = "select mrl_txn_id from MATERIALIZATION_REBUILD_LOCKS where" + + " mrl_db_name =" + quoteString(dbName) + + " AND mrl_tbl_name=" + quoteString(tableName); + LOG.debug("Going to execute query <" + selectQ + ">"); + rs = stmt.executeQuery(selectQ); + if(rs.next()) { + LOG.info("Ignoring request to rebuild " + dbName + "/" + tableName + + " since it is already being rebuilt"); + return new LockResponse(txnId, LockState.NOT_ACQUIRED); } - final BasicTxnInfo txnInfo = new BasicTxnInfo(false); - txnInfo.setTime(rs.getTimestamp(1, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime()); - txnInfo.setTxnid(rs.getLong(2)); - txnInfo.setDbname(rs.getString(3)); - txnInfo.setTablename(rs.getString(4)); - return txnInfo; + String insertQ = "insert into MATERIALIZATION_REBUILD_LOCKS " + + "(mrl_txn_id, mrl_db_name, mrl_tbl_name, mrl_last_heartbeat) values (" + txnId + + ", '" + dbName + "', '" + tableName + "', " + Instant.now().toEpochMilli() + ")"; + LOG.debug("Going to execute update <" + insertQ + ">"); + stmt.executeUpdate(insertQ); + LOG.debug("Going to commit"); + dbConn.commit(); + return new LockResponse(txnId, LockState.ACQUIRED); } catch (SQLException ex) { - LOG.warn("getLastCompletedTransactionForTable failed due to " + getMessage(ex), ex); - throw new MetaException("Unable to retrieve commits information due to " + StringUtils.stringifyException(ex)); + LOG.warn("lockMaterializationRebuild failed due to " + getMessage(ex), ex); + throw new MetaException("Unable to retrieve materialization invalidation information due to " + + StringUtils.stringifyException(ex)); } finally { close(rs, stmt, dbConn); + if(handle != null) { + handle.releaseLocks(); + } + unlockInternal(); + } + } + + @Override + public boolean heartbeatLockMaterializationRebuild(String dbName, String tableName, long txnId) + throws MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "update MATERIALIZATION_REBUILD_LOCKS" + + " set mrl_last_heartbeat = " + Instant.now().toEpochMilli() + + " where mrl_txn_id = " + txnId + + " AND mrl_db_name =" + quoteString(dbName) + + " AND mrl_tbl_name=" + quoteString(tableName); + LOG.debug("Going to execute update <" + s + ">"); + int rc = stmt.executeUpdate(s); + if (rc < 1) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + LOG.info("No lock found for rebuild of " + Warehouse.getQualifiedName(dbName, tableName) + + " when trying to heartbeat"); + // It could not be renewed, return that information + return false; + } + LOG.debug("Going to commit"); + dbConn.commit(); + // It could be renewed, return that information + return true; + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, + "heartbeatLockMaterializationRebuild(" + Warehouse.getQualifiedName(dbName, tableName) + ", " + txnId + ")"); + throw new MetaException("Unable to heartbeat rebuild lock due to " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + unlockInternal(); + } + } catch (RetryException e) { + return heartbeatLockMaterializationRebuild(dbName, tableName ,txnId); + } + } + + @Override + public long cleanupMaterializationRebuildLocks(ValidTxnList validTxnList, long timeout) throws MetaException { + try { + // Aux values + long cnt = 0L; + List<Long> txnIds = new ArrayList<>(); + long timeoutTime = Instant.now().toEpochMilli() - timeout; + + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + + String selectQ = "select mrl_txn_id, mrl_last_heartbeat from MATERIALIZATION_REBUILD_LOCKS"; + LOG.debug("Going to execute query <" + selectQ + ">"); + rs = stmt.executeQuery(selectQ); + while(rs.next()) { + long lastHeartbeat = rs.getLong(2); + if (lastHeartbeat < timeoutTime) { + // The heartbeat has timeout, double check whether we can remove it + long txnId = rs.getLong(1); + if (validTxnList.isTxnValid(txnId) || validTxnList.isTxnAborted(txnId)) { + // Txn was committed (but notification was not received) or it was aborted. + // Either case, we can clean it up + txnIds.add(txnId); + } + } + } + if (!txnIds.isEmpty()) { + String deleteQ = "delete from MATERIALIZATION_REBUILD_LOCKS where" + + " mrl_txn_id IN(" + StringUtils.join(",", txnIds) + ") "; + LOG.debug("Going to execute update <" + deleteQ + ">"); + cnt = stmt.executeUpdate(deleteQ); + } + LOG.debug("Going to commit"); + dbConn.commit(); + return cnt; + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "cleanupMaterializationRebuildLocks"); + throw new MetaException("Unable to clean rebuild locks due to " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + unlockInternal(); + } + } catch (RetryException e) { + return cleanupMaterializationRebuildLocks(validTxnList, timeout); } } @@ -1915,6 +2076,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { private static String normalizeCase(String s) { return s == null ? null : s.toLowerCase(); } + private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long txnId) throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException { try { @@ -4802,20 +4964,4 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } }; - private class TransactionRegistryInfo { - final String dbName; - final String tblName; - final String fullyQualifiedName; - final long writeId; - final long timestamp; - - public TransactionRegistryInfo (String dbName, String tblName, long writeId, long timestamp) { - this.dbName = dbName; - this.tblName = tblName; - this.fullyQualifiedName = Warehouse.getQualifiedName(dbName, tblName); - this.writeId = writeId; - this.timestamp = timestamp; - } - } - }
