http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py 
b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 11affe3..8fba3df 100644
--- a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -14508,6 +14508,7 @@ class CreationMetadata:
    - tblName
    - tablesUsed
    - validTxnList
+   - materializationTime
   """
 
   thrift_spec = (
@@ -14517,14 +14518,16 @@ class CreationMetadata:
     (3, TType.STRING, 'tblName', None, None, ), # 3
     (4, TType.SET, 'tablesUsed', (TType.STRING,None), None, ), # 4
     (5, TType.STRING, 'validTxnList', None, None, ), # 5
+    (6, TType.I64, 'materializationTime', None, None, ), # 6
   )
 
-  def __init__(self, catName=None, dbName=None, tblName=None, tablesUsed=None, 
validTxnList=None,):
+  def __init__(self, catName=None, dbName=None, tblName=None, tablesUsed=None, 
validTxnList=None, materializationTime=None,):
     self.catName = catName
     self.dbName = dbName
     self.tblName = tblName
     self.tablesUsed = tablesUsed
     self.validTxnList = validTxnList
+    self.materializationTime = materializationTime
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is 
not None and fastbinary is not None:
@@ -14565,6 +14568,11 @@ class CreationMetadata:
           self.validTxnList = iprot.readString()
         else:
           iprot.skip(ftype)
+      elif fid == 6:
+        if ftype == TType.I64:
+          self.materializationTime = iprot.readI64()
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -14598,6 +14606,10 @@ class CreationMetadata:
       oprot.writeFieldBegin('validTxnList', TType.STRING, 5)
       oprot.writeString(self.validTxnList)
       oprot.writeFieldEnd()
+    if self.materializationTime is not None:
+      oprot.writeFieldBegin('materializationTime', TType.I64, 6)
+      oprot.writeI64(self.materializationTime)
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -14620,6 +14632,7 @@ class CreationMetadata:
     value = (value * 31) ^ hash(self.tblName)
     value = (value * 31) ^ hash(self.tablesUsed)
     value = (value * 31) ^ hash(self.validTxnList)
+    value = (value * 31) ^ hash(self.materializationTime)
     return value
 
   def __repr__(self):
@@ -17224,24 +17237,15 @@ class TableMeta:
 class Materialization:
   """
   Attributes:
-   - tablesUsed
-   - validTxnList
-   - invalidationTime
    - sourceTablesUpdateDeleteModified
   """
 
   thrift_spec = (
     None, # 0
-    (1, TType.SET, 'tablesUsed', (TType.STRING,None), None, ), # 1
-    (2, TType.STRING, 'validTxnList', None, None, ), # 2
-    (3, TType.I64, 'invalidationTime', None, None, ), # 3
-    (4, TType.BOOL, 'sourceTablesUpdateDeleteModified', None, None, ), # 4
+    (1, TType.BOOL, 'sourceTablesUpdateDeleteModified', None, None, ), # 1
   )
 
-  def __init__(self, tablesUsed=None, validTxnList=None, 
invalidationTime=None, sourceTablesUpdateDeleteModified=None,):
-    self.tablesUsed = tablesUsed
-    self.validTxnList = validTxnList
-    self.invalidationTime = invalidationTime
+  def __init__(self, sourceTablesUpdateDeleteModified=None,):
     self.sourceTablesUpdateDeleteModified = sourceTablesUpdateDeleteModified
 
   def read(self, iprot):
@@ -17254,26 +17258,6 @@ class Materialization:
       if ftype == TType.STOP:
         break
       if fid == 1:
-        if ftype == TType.SET:
-          self.tablesUsed = set()
-          (_etype742, _size739) = iprot.readSetBegin()
-          for _i743 in xrange(_size739):
-            _elem744 = iprot.readString()
-            self.tablesUsed.add(_elem744)
-          iprot.readSetEnd()
-        else:
-          iprot.skip(ftype)
-      elif fid == 2:
-        if ftype == TType.STRING:
-          self.validTxnList = iprot.readString()
-        else:
-          iprot.skip(ftype)
-      elif fid == 3:
-        if ftype == TType.I64:
-          self.invalidationTime = iprot.readI64()
-        else:
-          iprot.skip(ftype)
-      elif fid == 4:
         if ftype == TType.BOOL:
           self.sourceTablesUpdateDeleteModified = iprot.readBool()
         else:
@@ -17288,39 +17272,21 @@ class Materialization:
       oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, 
self.thrift_spec)))
       return
     oprot.writeStructBegin('Materialization')
-    if self.tablesUsed is not None:
-      oprot.writeFieldBegin('tablesUsed', TType.SET, 1)
-      oprot.writeSetBegin(TType.STRING, len(self.tablesUsed))
-      for iter745 in self.tablesUsed:
-        oprot.writeString(iter745)
-      oprot.writeSetEnd()
-      oprot.writeFieldEnd()
-    if self.validTxnList is not None:
-      oprot.writeFieldBegin('validTxnList', TType.STRING, 2)
-      oprot.writeString(self.validTxnList)
-      oprot.writeFieldEnd()
-    if self.invalidationTime is not None:
-      oprot.writeFieldBegin('invalidationTime', TType.I64, 3)
-      oprot.writeI64(self.invalidationTime)
-      oprot.writeFieldEnd()
     if self.sourceTablesUpdateDeleteModified is not None:
-      oprot.writeFieldBegin('sourceTablesUpdateDeleteModified', TType.BOOL, 4)
+      oprot.writeFieldBegin('sourceTablesUpdateDeleteModified', TType.BOOL, 1)
       oprot.writeBool(self.sourceTablesUpdateDeleteModified)
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
   def validate(self):
-    if self.tablesUsed is None:
-      raise TProtocol.TProtocolException(message='Required field tablesUsed is 
unset!')
+    if self.sourceTablesUpdateDeleteModified is None:
+      raise TProtocol.TProtocolException(message='Required field 
sourceTablesUpdateDeleteModified is unset!')
     return
 
 
   def __hash__(self):
     value = 17
-    value = (value * 31) ^ hash(self.tablesUsed)
-    value = (value * 31) ^ hash(self.validTxnList)
-    value = (value * 31) ^ hash(self.invalidationTime)
     value = (value * 31) ^ hash(self.sourceTablesUpdateDeleteModified)
     return value
 
@@ -18197,44 +18163,44 @@ class WMFullResourcePlan:
       elif fid == 2:
         if ftype == TType.LIST:
           self.pools = []
-          (_etype749, _size746) = iprot.readListBegin()
-          for _i750 in xrange(_size746):
-            _elem751 = WMPool()
-            _elem751.read(iprot)
-            self.pools.append(_elem751)
+          (_etype742, _size739) = iprot.readListBegin()
+          for _i743 in xrange(_size739):
+            _elem744 = WMPool()
+            _elem744.read(iprot)
+            self.pools.append(_elem744)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
       elif fid == 3:
         if ftype == TType.LIST:
           self.mappings = []
-          (_etype755, _size752) = iprot.readListBegin()
-          for _i756 in xrange(_size752):
-            _elem757 = WMMapping()
-            _elem757.read(iprot)
-            self.mappings.append(_elem757)
+          (_etype748, _size745) = iprot.readListBegin()
+          for _i749 in xrange(_size745):
+            _elem750 = WMMapping()
+            _elem750.read(iprot)
+            self.mappings.append(_elem750)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
       elif fid == 4:
         if ftype == TType.LIST:
           self.triggers = []
-          (_etype761, _size758) = iprot.readListBegin()
-          for _i762 in xrange(_size758):
-            _elem763 = WMTrigger()
-            _elem763.read(iprot)
-            self.triggers.append(_elem763)
+          (_etype754, _size751) = iprot.readListBegin()
+          for _i755 in xrange(_size751):
+            _elem756 = WMTrigger()
+            _elem756.read(iprot)
+            self.triggers.append(_elem756)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
       elif fid == 5:
         if ftype == TType.LIST:
           self.poolTriggers = []
-          (_etype767, _size764) = iprot.readListBegin()
-          for _i768 in xrange(_size764):
-            _elem769 = WMPoolTrigger()
-            _elem769.read(iprot)
-            self.poolTriggers.append(_elem769)
+          (_etype760, _size757) = iprot.readListBegin()
+          for _i761 in xrange(_size757):
+            _elem762 = WMPoolTrigger()
+            _elem762.read(iprot)
+            self.poolTriggers.append(_elem762)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -18255,29 +18221,29 @@ class WMFullResourcePlan:
     if self.pools is not None:
       oprot.writeFieldBegin('pools', TType.LIST, 2)
       oprot.writeListBegin(TType.STRUCT, len(self.pools))
-      for iter770 in self.pools:
-        iter770.write(oprot)
+      for iter763 in self.pools:
+        iter763.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.mappings is not None:
       oprot.writeFieldBegin('mappings', TType.LIST, 3)
       oprot.writeListBegin(TType.STRUCT, len(self.mappings))
-      for iter771 in self.mappings:
-        iter771.write(oprot)
+      for iter764 in self.mappings:
+        iter764.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.triggers is not None:
       oprot.writeFieldBegin('triggers', TType.LIST, 4)
       oprot.writeListBegin(TType.STRUCT, len(self.triggers))
-      for iter772 in self.triggers:
-        iter772.write(oprot)
+      for iter765 in self.triggers:
+        iter765.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.poolTriggers is not None:
       oprot.writeFieldBegin('poolTriggers', TType.LIST, 5)
       oprot.writeListBegin(TType.STRUCT, len(self.poolTriggers))
-      for iter773 in self.poolTriggers:
-        iter773.write(oprot)
+      for iter766 in self.poolTriggers:
+        iter766.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -18751,11 +18717,11 @@ class WMGetAllResourcePlanResponse:
       if fid == 1:
         if ftype == TType.LIST:
           self.resourcePlans = []
-          (_etype777, _size774) = iprot.readListBegin()
-          for _i778 in xrange(_size774):
-            _elem779 = WMResourcePlan()
-            _elem779.read(iprot)
-            self.resourcePlans.append(_elem779)
+          (_etype770, _size767) = iprot.readListBegin()
+          for _i771 in xrange(_size767):
+            _elem772 = WMResourcePlan()
+            _elem772.read(iprot)
+            self.resourcePlans.append(_elem772)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -18772,8 +18738,8 @@ class WMGetAllResourcePlanResponse:
     if self.resourcePlans is not None:
       oprot.writeFieldBegin('resourcePlans', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.resourcePlans))
-      for iter780 in self.resourcePlans:
-        iter780.write(oprot)
+      for iter773 in self.resourcePlans:
+        iter773.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -19077,20 +19043,20 @@ class WMValidateResourcePlanResponse:
       if fid == 1:
         if ftype == TType.LIST:
           self.errors = []
-          (_etype784, _size781) = iprot.readListBegin()
-          for _i785 in xrange(_size781):
-            _elem786 = iprot.readString()
-            self.errors.append(_elem786)
+          (_etype777, _size774) = iprot.readListBegin()
+          for _i778 in xrange(_size774):
+            _elem779 = iprot.readString()
+            self.errors.append(_elem779)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
       elif fid == 2:
         if ftype == TType.LIST:
           self.warnings = []
-          (_etype790, _size787) = iprot.readListBegin()
-          for _i791 in xrange(_size787):
-            _elem792 = iprot.readString()
-            self.warnings.append(_elem792)
+          (_etype783, _size780) = iprot.readListBegin()
+          for _i784 in xrange(_size780):
+            _elem785 = iprot.readString()
+            self.warnings.append(_elem785)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -19107,15 +19073,15 @@ class WMValidateResourcePlanResponse:
     if self.errors is not None:
       oprot.writeFieldBegin('errors', TType.LIST, 1)
       oprot.writeListBegin(TType.STRING, len(self.errors))
-      for iter793 in self.errors:
-        oprot.writeString(iter793)
+      for iter786 in self.errors:
+        oprot.writeString(iter786)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.warnings is not None:
       oprot.writeFieldBegin('warnings', TType.LIST, 2)
       oprot.writeListBegin(TType.STRING, len(self.warnings))
-      for iter794 in self.warnings:
-        oprot.writeString(iter794)
+      for iter787 in self.warnings:
+        oprot.writeString(iter787)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -19692,11 +19658,11 @@ class WMGetTriggersForResourePlanResponse:
       if fid == 1:
         if ftype == TType.LIST:
           self.triggers = []
-          (_etype798, _size795) = iprot.readListBegin()
-          for _i799 in xrange(_size795):
-            _elem800 = WMTrigger()
-            _elem800.read(iprot)
-            self.triggers.append(_elem800)
+          (_etype791, _size788) = iprot.readListBegin()
+          for _i792 in xrange(_size788):
+            _elem793 = WMTrigger()
+            _elem793.read(iprot)
+            self.triggers.append(_elem793)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -19713,8 +19679,8 @@ class WMGetTriggersForResourePlanResponse:
     if self.triggers is not None:
       oprot.writeFieldBegin('triggers', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.triggers))
-      for iter801 in self.triggers:
-        iter801.write(oprot)
+      for iter794 in self.triggers:
+        iter794.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -20898,11 +20864,11 @@ class SchemaVersion:
       elif fid == 4:
         if ftype == TType.LIST:
           self.cols = []
-          (_etype805, _size802) = iprot.readListBegin()
-          for _i806 in xrange(_size802):
-            _elem807 = FieldSchema()
-            _elem807.read(iprot)
-            self.cols.append(_elem807)
+          (_etype798, _size795) = iprot.readListBegin()
+          for _i799 in xrange(_size795):
+            _elem800 = FieldSchema()
+            _elem800.read(iprot)
+            self.cols.append(_elem800)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -20962,8 +20928,8 @@ class SchemaVersion:
     if self.cols is not None:
       oprot.writeFieldBegin('cols', TType.LIST, 4)
       oprot.writeListBegin(TType.STRUCT, len(self.cols))
-      for iter808 in self.cols:
-        iter808.write(oprot)
+      for iter801 in self.cols:
+        iter801.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.state is not None:
@@ -21218,11 +21184,11 @@ class FindSchemasByColsResp:
       if fid == 1:
         if ftype == TType.LIST:
           self.schemaVersions = []
-          (_etype812, _size809) = iprot.readListBegin()
-          for _i813 in xrange(_size809):
-            _elem814 = SchemaVersionDescriptor()
-            _elem814.read(iprot)
-            self.schemaVersions.append(_elem814)
+          (_etype805, _size802) = iprot.readListBegin()
+          for _i806 in xrange(_size802):
+            _elem807 = SchemaVersionDescriptor()
+            _elem807.read(iprot)
+            self.schemaVersions.append(_elem807)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -21239,8 +21205,8 @@ class FindSchemasByColsResp:
     if self.schemaVersions is not None:
       oprot.writeFieldBegin('schemaVersions', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.schemaVersions))
-      for iter815 in self.schemaVersions:
-        iter815.write(oprot)
+      for iter808 in self.schemaVersions:
+        iter808.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()

http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb 
b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
index fc640d0..cc77b50 100644
--- a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -3243,13 +3243,15 @@ class CreationMetadata
   TBLNAME = 3
   TABLESUSED = 4
   VALIDTXNLIST = 5
+  MATERIALIZATIONTIME = 6
 
   FIELDS = {
     CATNAME => {:type => ::Thrift::Types::STRING, :name => 'catName'},
     DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'},
     TBLNAME => {:type => ::Thrift::Types::STRING, :name => 'tblName'},
     TABLESUSED => {:type => ::Thrift::Types::SET, :name => 'tablesUsed', 
:element => {:type => ::Thrift::Types::STRING}},
-    VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 
'validTxnList', :optional => true}
+    VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 
'validTxnList', :optional => true},
+    MATERIALIZATIONTIME => {:type => ::Thrift::Types::I64, :name => 
'materializationTime', :optional => true}
   }
 
   def struct_fields; FIELDS; end
@@ -3870,22 +3872,16 @@ end
 
 class Materialization
   include ::Thrift::Struct, ::Thrift::Struct_Union
-  TABLESUSED = 1
-  VALIDTXNLIST = 2
-  INVALIDATIONTIME = 3
-  SOURCETABLESUPDATEDELETEMODIFIED = 4
+  SOURCETABLESUPDATEDELETEMODIFIED = 1
 
   FIELDS = {
-    TABLESUSED => {:type => ::Thrift::Types::SET, :name => 'tablesUsed', 
:element => {:type => ::Thrift::Types::STRING}},
-    VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 
'validTxnList', :optional => true},
-    INVALIDATIONTIME => {:type => ::Thrift::Types::I64, :name => 
'invalidationTime', :optional => true},
-    SOURCETABLESUPDATEDELETEMODIFIED => {:type => ::Thrift::Types::BOOL, :name 
=> 'sourceTablesUpdateDeleteModified', :optional => true}
+    SOURCETABLESUPDATEDELETEMODIFIED => {:type => ::Thrift::Types::BOOL, :name 
=> 'sourceTablesUpdateDeleteModified'}
   }
 
   def struct_fields; FIELDS; end
 
   def validate
-    raise 
::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required 
field tablesUsed is unset!') unless @tablesUsed
+    raise 
::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required 
field sourceTablesUpdateDeleteModified is unset!') if 
@sourceTablesUpdateDeleteModified.nil?
   end
 
   ::Thrift::Struct.generate_accessors self

http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb 
b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
index bbf3f12..1e1a18f 100644
--- a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
+++ b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
@@ -726,13 +726,13 @@ module ThriftHiveMetastore
       raise 
::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT,
 'get_table_objects_by_name_req failed: unknown result')
     end
 
-    def get_materialization_invalidation_info(dbname, tbl_names)
-      send_get_materialization_invalidation_info(dbname, tbl_names)
+    def get_materialization_invalidation_info(creation_metadata, validTxnList)
+      send_get_materialization_invalidation_info(creation_metadata, 
validTxnList)
       return recv_get_materialization_invalidation_info()
     end
 
-    def send_get_materialization_invalidation_info(dbname, tbl_names)
-      send_message('get_materialization_invalidation_info', 
Get_materialization_invalidation_info_args, :dbname => dbname, :tbl_names => 
tbl_names)
+    def send_get_materialization_invalidation_info(creation_metadata, 
validTxnList)
+      send_message('get_materialization_invalidation_info', 
Get_materialization_invalidation_info_args, :creation_metadata => 
creation_metadata, :validTxnList => validTxnList)
     end
 
     def recv_get_materialization_invalidation_info()
@@ -4028,7 +4028,7 @@ module ThriftHiveMetastore
       args = read_args(iprot, Get_materialization_invalidation_info_args)
       result = Get_materialization_invalidation_info_result.new()
       begin
-        result.success = 
@handler.get_materialization_invalidation_info(args.dbname, args.tbl_names)
+        result.success = 
@handler.get_materialization_invalidation_info(args.creation_metadata, 
args.validTxnList)
       rescue ::MetaException => o1
         result.o1 = o1
       rescue ::InvalidOperationException => o2
@@ -7632,12 +7632,12 @@ module ThriftHiveMetastore
 
   class Get_materialization_invalidation_info_args
     include ::Thrift::Struct, ::Thrift::Struct_Union
-    DBNAME = 1
-    TBL_NAMES = 2
+    CREATION_METADATA = 1
+    VALIDTXNLIST = 2
 
     FIELDS = {
-      DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbname'},
-      TBL_NAMES => {:type => ::Thrift::Types::LIST, :name => 'tbl_names', 
:element => {:type => ::Thrift::Types::STRING}}
+      CREATION_METADATA => {:type => ::Thrift::Types::STRUCT, :name => 
'creation_metadata', :class => ::CreationMetadata},
+      VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 
'validTxnList'}
     }
 
     def struct_fields; FIELDS; end
@@ -7656,7 +7656,7 @@ module ThriftHiveMetastore
     O3 = 3
 
     FIELDS = {
-      SUCCESS => {:type => ::Thrift::Types::MAP, :name => 'success', :key => 
{:type => ::Thrift::Types::STRING}, :value => {:type => 
::Thrift::Types::STRUCT, :class => ::Materialization}},
+      SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class 
=> ::Materialization},
       O1 => {:type => ::Thrift::Types::STRUCT, :name => 'o1', :class => 
::MetaException},
       O2 => {:type => ::Thrift::Types::STRUCT, :name => 'o2', :class => 
::InvalidOperationException},
       O3 => {:type => ::Thrift::Types::STRUCT, :name => 'o3', :class => 
::UnknownDBException}

http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index f3ad723..cd68cbe 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -3053,8 +3053,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     }
 
     @Override
-    public Map<String, Materialization> 
get_materialization_invalidation_info(final String dbName, final List<String> 
tableNames) {
-      return 
MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(dbName,
 tableNames);
+    public Materialization get_materialization_invalidation_info(final 
CreationMetadata cm, final String validTxnList) throws MetaException {
+      return getTxnHandler().getMaterializationInvalidationInfo(cm, 
validTxnList);
     }
 
     @Override
@@ -8603,13 +8603,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     @Override
     public LockResponse get_lock_materialization_rebuild(String dbName, String 
tableName, long txnId)
         throws TException {
-      return MaterializationsRebuildLockHandler.get().lockResource(dbName, 
tableName, txnId);
+      return getTxnHandler().lockMaterializationRebuild(dbName, tableName, 
txnId);
     }
 
     @Override
     public boolean heartbeat_lock_materialization_rebuild(String dbName, 
String tableName, long txnId)
         throws TException {
-      return 
MaterializationsRebuildLockHandler.get().refreshLockResource(dbName, tableName, 
txnId);
+      return getTxnHandler().heartbeatLockMaterializationRebuild(dbName, 
tableName, txnId);
     }
 
     @Override
@@ -8925,8 +8925,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           false);
       IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf);
 
-      // Initialize materializations invalidation cache
-      MaterializationsInvalidationCache.get().init(conf, handler);
       TServerSocket serverSocket;
 
       if (useSasl) {

http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 0eb7f1a..2a14dd4 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -167,8 +167,6 @@ public class HiveMetaStoreClient implements 
IMetaStoreClient, AutoCloseable {
       // instantiate the metastore server handler directly instead of 
connecting
       // through the network
       client = HiveMetaStore.newRetryingHMSHandler("hive client", this.conf, 
true);
-      // Initialize materializations invalidation cache (only for local 
metastore)
-      MaterializationsInvalidationCache.get().init(conf, (IHMSHandler) client);
       isConnected = true;
       snapshotActiveConf();
       return;
@@ -1599,10 +1597,9 @@ public class HiveMetaStoreClient implements 
IMetaStoreClient, AutoCloseable {
   }
 
   @Override
-  public Map<String, Materialization> 
getMaterializationsInvalidationInfo(String dbName, List<String> viewNames)
+  public Materialization getMaterializationInvalidationInfo(CreationMetadata 
cm, String validTxnList)
       throws MetaException, InvalidOperationException, UnknownDBException, 
TException {
-    return client.get_materialization_invalidation_info(
-        dbName, filterHook.filterTableNames(getDefaultCatalog(conf), dbName, 
viewNames));
+    return client.get_materialization_invalidation_info(cm, validTxnList);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index bc09076..234e0cf 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -773,7 +773,7 @@ public interface IMetaStoreClient {
   /**
    * Returns the invalidation information for the materialized views given as 
input.
    */
-  Map<String, Materialization> getMaterializationsInvalidationInfo(String 
dbName, List<String> viewNames)
+  Materialization getMaterializationInvalidationInfo(CreationMetadata cm, 
String validTxnList)
       throws MetaException, InvalidOperationException, UnknownDBException, 
TException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java
deleted file mode 100644
index cc168a9..0000000
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Task responsible for cleaning the transactions that are not useful from the
- * materializations cache.
- */
-public class MaterializationsCacheCleanerTask implements MetastoreTaskThread {
-  private static final Logger LOG = 
LoggerFactory.getLogger(MaterializationsCacheCleanerTask.class);
-
-  private Configuration conf;
-
-  @Override
-  public long runFrequency(TimeUnit unit) {
-    return MetastoreConf.getTimeVar(conf,
-        
MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_CLEAN_FREQUENCY, 
unit);
-  }
-
-  @Override
-  public void setConf(Configuration configuration) {
-    conf = configuration;
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void run() {
-    long removedCnt = 
MaterializationsInvalidationCache.get().cleanup(System.currentTimeMillis() -
-        MetastoreConf.getTimeVar(conf,
-            
MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_EXPIRY_DURATION, 
TimeUnit.MILLISECONDS));
-    if (removedCnt > 0) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Number of transaction entries deleted from materializations 
cache: " + removedCnt);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
deleted file mode 100644
index fc644f0..0000000
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
+++ /dev/null
@@ -1,543 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.apache.hadoop.conf.Configuration;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
-import org.apache.hadoop.hive.metastore.api.BasicTxnInfo;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.Materialization;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-/**
- * This cache keeps information in memory about the table modifications so 
materialized views
- * can verify their invalidation time, i.e., the moment after materialization 
on which the
- * first transaction to the tables they used happened. This information is 
kept in memory
- * to check the invalidation quickly. However, we store enough information in 
the metastore
- * to bring this cache up if the metastore is restarted or would crashed. This 
cache lives
- * in the metastore server.
- */
-public final class MaterializationsInvalidationCache {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(MaterializationsInvalidationCache.class);
-
-  /* Singleton */
-  private static final MaterializationsInvalidationCache SINGLETON = new 
MaterializationsInvalidationCache();
-
-  /* If this boolean is true, this class has no functionality. Only for 
debugging purposes. */
-  private boolean disable;
-
-  /* Key is the database name. Each value is a map from the unique view 
qualified name to
-   * the materialization invalidation info. This invalidation object contains 
information
-   * such as the tables used by the materialized view, whether there was any 
update or
-   * delete in the source tables since the materialized view was created or 
rebuilt,
-   * or the invalidation time, i.e., first modification of the tables used by 
materialized
-   * view after the view was created. */
-  private final ConcurrentMap<String, ConcurrentMap<String, Materialization>> 
materializations =
-      new ConcurrentHashMap<>();
-
-  /*
-   * Key is a qualified table name. The value is a (sorted) tree map 
(supporting concurrent
-   * modifications) that will keep the modifications for a given table in the 
order of their
-   * transaction id. This is useful to quickly check the invalidation time for 
a given
-   * materialization.
-   */
-  private final ConcurrentMap<String, ConcurrentSkipListMap<Long, Long>> 
tableModifications =
-      new ConcurrentHashMap<>();
-
-  private final ConcurrentMap<String, ConcurrentSkipListSet<Long>> 
updateDeleteTableModifications =
-      new ConcurrentHashMap<>();
-
-  /* Whether the cache has been initialized or not. */
-  private boolean initialized;
-  /* Configuration for cache. */
-  private Configuration conf;
-  /* Handler to connect to metastore. */
-  private IHMSHandler handler;
-
-  private MaterializationsInvalidationCache() {
-  }
-
-  /**
-   * Get instance of MaterializationsInvalidationCache.
-   *
-   * @return the singleton
-   */
-  public static MaterializationsInvalidationCache get() {
-    return SINGLETON;
-  }
-
-  /**
-   * Initialize the invalidation cache.
-   *
-   * The method is synchronized because we want to avoid initializing the 
invalidation cache
-   * multiple times in embedded mode. This will not happen when we run the 
metastore remotely
-   * as the method is called only once.
-   */
-  public synchronized void init(Configuration conf, IHMSHandler handler) {
-    this.conf = conf;
-    this.handler = handler;
-
-    // This will only be true for debugging purposes
-    this.disable = MetastoreConf.getVar(conf,
-        
MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_IMPL).equals("DISABLE");
-    if (disable) {
-      // Nothing to do
-      return;
-    }
-
-    if (!initialized) {
-      this.initialized = true;
-      ExecutorService pool = Executors.newCachedThreadPool();
-      pool.submit(new Loader());
-      pool.shutdown();
-    }
-  }
-
-  private class Loader implements Runnable {
-    @Override
-    public void run() {
-      try {
-        RawStore store = handler.getMS();
-        for (String catName : store.getCatalogs()) {
-          for (String dbName : store.getAllDatabases(catName)) {
-            for (Table mv : store.getTableObjectsByName(catName, dbName,
-                store.getTables(catName, dbName, null, 
TableType.MATERIALIZED_VIEW))) {
-              addMaterializedView(mv.getDbName(), mv.getTableName(), 
ImmutableSet.copyOf(mv.getCreationMetadata().getTablesUsed()),
-                  mv.getCreationMetadata().getValidTxnList(), OpType.LOAD);
-            }
-          }
-        }
-        LOG.info("Initialized materializations invalidation cache");
-      } catch (Exception e) {
-        LOG.error("Problem connecting to the metastore when initializing the 
view registry");
-      }
-    }
-  }
-
-  /**
-   * Adds a newly created materialized view to the cache.
-   *
-   * @param dbName
-   * @param tableName
-   * @param tablesUsed tables used by the materialized view
-   * @param validTxnList
-   */
-  public void createMaterializedView(String dbName, String tableName, 
Set<String> tablesUsed,
-      String validTxnList) {
-    addMaterializedView(dbName, tableName, tablesUsed, validTxnList, 
OpType.CREATE);
-  }
-
-  /**
-   * Method to call when materialized view is modified.
-   *
-   * @param dbName
-   * @param tableName
-   * @param tablesUsed tables used by the materialized view
-   * @param validTxnList
-   */
-  public void alterMaterializedView(String dbName, String tableName, 
Set<String> tablesUsed,
-      String validTxnList) {
-    addMaterializedView(dbName, tableName, tablesUsed, validTxnList, 
OpType.ALTER);
-  }
-
-  /**
-   * Adds the materialized view to the cache.
-   *
-   * @param dbName
-   * @param tableName
-   * @param tablesUsed tables used by the materialized view
-   * @param validTxnList
-   * @param opType
-   */
-  private void addMaterializedView(String dbName, String tableName, 
Set<String> tablesUsed,
-      String validTxnList, OpType opType) {
-    if (disable) {
-      // Nothing to do
-      return;
-    }
-    // We are going to create the map for each view in the given database
-    ConcurrentMap<String, Materialization> cq =
-        new ConcurrentHashMap<String, Materialization>();
-    final ConcurrentMap<String, Materialization> prevCq = 
materializations.putIfAbsent(
-        dbName, cq);
-    if (prevCq != null) {
-      cq = prevCq;
-    }
-    // Start the process to add materialization to the cache
-    // Before loading the materialization in the cache, we need to update some
-    // important information in the registry to account for rewriting 
invalidation
-    if (validTxnList == null) {
-      // This can happen when the materialized view was created on 
non-transactional tables
-      return;
-    }
-    if (opType == OpType.CREATE || opType == OpType.ALTER) {
-      // You store the materialized view
-      Materialization materialization = new Materialization(tablesUsed);
-      materialization.setValidTxnList(validTxnList);
-      cq.put(tableName, materialization);
-    } else {
-      ValidTxnWriteIdList txnList = new ValidTxnWriteIdList(validTxnList);
-      for (String qNameTableUsed : tablesUsed) {
-        ValidWriteIdList tableTxnList = 
txnList.getTableValidWriteIdList(qNameTableUsed);
-        // First we insert a new tree set to keep table modifications, unless 
it already exists
-        ConcurrentSkipListMap<Long, Long> modificationsTree = new 
ConcurrentSkipListMap<>();
-        final ConcurrentSkipListMap<Long, Long> prevModificationsTree = 
tableModifications.putIfAbsent(
-                qNameTableUsed, modificationsTree);
-        if (prevModificationsTree != null) {
-          modificationsTree = prevModificationsTree;
-        }
-        // If we are not creating the MV at this instant, but instead it was 
created previously
-        // and we are loading it into the cache, we need to go through the 
transaction entries and
-        // check if the MV is still valid.
-        try {
-          String[] names =  qNameTableUsed.split("\\.");
-          BasicTxnInfo e = 
handler.getTxnHandler().getFirstCompletedTransactionForTableAfterCommit(
-                  names[0], names[1], tableTxnList);
-          if (!e.isIsnull()) {
-            modificationsTree.put(e.getTxnid(), e.getTime());
-            // We do not need to do anything more for current table, as we 
detected
-            // a modification event that was in the metastore.
-            continue;
-          }
-        } catch (MetaException ex) {
-          LOG.debug("Materialized view " + Warehouse.getQualifiedName(dbName, 
tableName) +
-                  " ignored; error loading view into invalidation cache", ex);
-          return;
-        }
-      }
-      // For LOAD, you only add it if it does exist as you might be loading an 
outdated MV
-      Materialization materialization = new Materialization(tablesUsed);
-      materialization.setValidTxnList(validTxnList);
-      cq.putIfAbsent(tableName, materialization);
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Cached materialized view for rewriting in invalidation cache: 
" +
-          Warehouse.getQualifiedName(dbName, tableName));
-    }
-  }
-
-  /**
-   * This method is called when a table is modified. That way we can keep 
track of the
-   * invalidation for the MVs that use that table.
-   */
-  public void notifyTableModification(String dbName, String tableName,
-      long txnId, long newModificationTime, boolean isUpdateDelete) {
-    if (disable) {
-      // Nothing to do
-      return;
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Notification for table {} in database {} received -> id: {}, 
time: {}",
-          tableName, dbName, txnId, newModificationTime);
-    }
-    if (isUpdateDelete) {
-      // We update first the update/delete modifications record
-      ConcurrentSkipListSet<Long> modificationsSet = new 
ConcurrentSkipListSet<>();
-      final ConcurrentSkipListSet<Long> prevModificationsSet =
-          
updateDeleteTableModifications.putIfAbsent(Warehouse.getQualifiedName(dbName, 
tableName),
-              modificationsSet);
-      if (prevModificationsSet != null) {
-        modificationsSet = prevModificationsSet;
-      }
-      modificationsSet.add(txnId);
-    }
-    ConcurrentSkipListMap<Long, Long> modificationsTree = new 
ConcurrentSkipListMap<>();
-    final ConcurrentSkipListMap<Long, Long> prevModificationsTree =
-        tableModifications.putIfAbsent(Warehouse.getQualifiedName(dbName, 
tableName), modificationsTree);
-    if (prevModificationsTree != null) {
-      modificationsTree = prevModificationsTree;
-    }
-    modificationsTree.put(txnId, newModificationTime);
-  }
-
-  /**
-   * Removes the materialized view from the cache.
-   *
-   * @param dbName
-   * @param tableName
-   */
-  public void dropMaterializedView(String dbName, String tableName) {
-    if (disable) {
-      // Nothing to do
-      return;
-    }
-    materializations.get(dbName).remove(tableName);
-  }
-
-  /**
-   * Returns the materialized views in the cache for the given database.
-   *
-   * @param dbName the database
-   * @return the collection of materialized views, or the empty collection if 
none
-   */
-  public Map<String, Materialization> getMaterializationInvalidationInfo(
-      String dbName, List<String> materializationNames) {
-    if (materializations.get(dbName) != null) {
-      ImmutableMap.Builder<String, Materialization> m = ImmutableMap.builder();
-      for (String materializationName : materializationNames) {
-        Materialization materialization =
-            materializations.get(dbName).get(materializationName);
-        if (materialization == null) {
-          LOG.debug("Materialization {} skipped as there is no information "
-              + "in the invalidation cache about it", materializationName);
-          continue;
-        }
-        // We create a deep copy of the materialization, as we need to set the 
time
-        // and whether any update/delete operation happen on the tables that 
it uses
-        // since it was created.
-        Materialization materializationCopy = new Materialization(
-            materialization.getTablesUsed());
-        materializationCopy.setValidTxnList(materialization.getValidTxnList());
-        enrichWithInvalidationInfo(materializationCopy);
-        m.put(materializationName, materializationCopy);
-      }
-      Map<String, Materialization> result = m.build();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Retrieved the following materializations from the 
invalidation cache: {}", result);
-      }
-      return result;
-    }
-    return ImmutableMap.of();
-  }
-
-  private void enrichWithInvalidationInfo(Materialization materialization) {
-    String materializationTxnListString = materialization.getValidTxnList();
-    if (materializationTxnListString == null) {
-      // This can happen when the materialization was created on 
non-transactional tables
-      materialization.setInvalidationTime(Long.MIN_VALUE);
-      return;
-    }
-
-    // We will obtain the modification time as follows.
-    // First, we obtain the first element after high watermark (if any)
-    // Then, we iterate through the elements from min open txn till high
-    // watermark, updating the modification time after creation if needed
-    ValidTxnWriteIdList materializationTxnList = new 
ValidTxnWriteIdList(materializationTxnListString);
-    long firstModificationTimeAfterCreation = 0L;
-    boolean containsUpdateDelete = false;
-    for (String qNameTableUsed : materialization.getTablesUsed()) {
-      final ValidWriteIdList tableMaterializationTxnList =
-          materializationTxnList.getTableValidWriteIdList(qNameTableUsed);
-
-      final ConcurrentSkipListMap<Long, Long> usedTableModifications =
-          tableModifications.get(qNameTableUsed);
-      if (usedTableModifications == null) {
-        // This is not necessarily an error, since the table may be empty. To 
be safe,
-        // instead of including this materialized view, we just log the 
information and
-        // skip it (if table is really empty, it will not matter for 
performance anyway).
-        LOG.warn("No information found in invalidation cache for table {}, 
possible tables are: {}",
-            qNameTableUsed, tableModifications.keySet());
-        materialization.setInvalidationTime(Long.MIN_VALUE);
-        return;
-      }
-      final ConcurrentSkipListSet<Long> usedUDTableModifications =
-          updateDeleteTableModifications.get(qNameTableUsed);
-      final Entry<Long, Long> tn = 
usedTableModifications.higherEntry(tableMaterializationTxnList.getHighWatermark());
-      if (tn != null) {
-        if (firstModificationTimeAfterCreation == 0L ||
-            tn.getValue() < firstModificationTimeAfterCreation) {
-          firstModificationTimeAfterCreation = tn.getValue();
-        }
-        // Check if there was any update/delete after creation
-        containsUpdateDelete = usedUDTableModifications != null &&
-            
!usedUDTableModifications.tailSet(tableMaterializationTxnList.getHighWatermark(),
 false).isEmpty();
-      }
-      // Min open txn might be null if there were no open transactions
-      // when this transaction was being executed
-      if (tableMaterializationTxnList.getMinOpenWriteId() != null) {
-        // Invalid transaction list is sorted
-        int pos = 0;
-        for (Map.Entry<Long, Long> t : usedTableModifications
-                .subMap(tableMaterializationTxnList.getMinOpenWriteId(), 
tableMaterializationTxnList.getHighWatermark()).entrySet()) {
-          while (pos < tableMaterializationTxnList.getInvalidWriteIds().length 
&&
-              tableMaterializationTxnList.getInvalidWriteIds()[pos] != 
t.getKey()) {
-            pos++;
-          }
-          if (pos >= tableMaterializationTxnList.getInvalidWriteIds().length) {
-            break;
-          }
-          if (firstModificationTimeAfterCreation == 0L ||
-              t.getValue() < firstModificationTimeAfterCreation) {
-            firstModificationTimeAfterCreation = t.getValue();
-          }
-          containsUpdateDelete = containsUpdateDelete ||
-              (usedUDTableModifications != null && 
usedUDTableModifications.contains(t.getKey()));
-        }
-      }
-    }
-
-    materialization.setInvalidationTime(firstModificationTimeAfterCreation);
-    materialization.setSourceTablesUpdateDeleteModified(containsUpdateDelete);
-  }
-
-  private enum OpType {
-    CREATE,
-    LOAD,
-    ALTER
-  }
-
-  /**
-   * Removes transaction events that are not relevant anymore.
-   * @param minTime events generated before this time (ms) can be deleted from 
the cache
-   * @return number of events that were deleted from the cache
-   */
-  public long cleanup(long minTime) {
-    // To remove, mv should meet two conditions:
-    // 1) Current time - time of transaction > config parameter, and
-    // 2) Transaction should not be associated with invalidation of a MV
-    if (disable || !initialized) {
-      // Bail out
-      return 0L;
-    }
-    // We execute the cleanup in two steps
-    // First we gather all the transactions that need to be kept
-    final Multimap<String, Long> keepTxnInfos = HashMultimap.create();
-    for (Map.Entry<String, ConcurrentMap<String, Materialization>> e : 
materializations.entrySet()) {
-      for (Materialization m : e.getValue().values()) {
-        ValidTxnWriteIdList txnList = new 
ValidTxnWriteIdList(m.getValidTxnList());
-        boolean canBeDeleted = false;
-        String currentTableForInvalidatingTxn = null;
-        long currentInvalidatingTxnId = 0L;
-        long currentInvalidatingTxnTime = 0L;
-        for (String qNameTableUsed : m.getTablesUsed()) {
-          ValidWriteIdList tableTxnList = 
txnList.getTableValidWriteIdList(qNameTableUsed);
-          final Entry<Long, Long> tn = tableModifications.get(qNameTableUsed)
-              .higherEntry(tableTxnList.getHighWatermark());
-          if (tn != null) {
-            if (currentInvalidatingTxnTime == 0L ||
-                tn.getValue() < currentInvalidatingTxnTime) {
-              // This transaction 1) is the first one examined for this 
materialization, or
-              // 2) it is the invalidating transaction. Hence we add it to the 
transactions to keep.
-              // 1.- We remove the previous invalidating transaction from the 
transactions
-              // to be kept (if needed).
-              if (canBeDeleted && currentInvalidatingTxnTime < minTime) {
-                keepTxnInfos.remove(currentTableForInvalidatingTxn, 
currentInvalidatingTxnId);
-              }
-              // 2.- We add this transaction to the transactions that should 
be kept.
-              canBeDeleted = 
!keepTxnInfos.get(qNameTableUsed).contains(tn.getKey());
-              keepTxnInfos.put(qNameTableUsed, tn.getKey());
-              // 3.- We record this transaction as the current invalidating 
transaction.
-              currentTableForInvalidatingTxn = qNameTableUsed;
-              currentInvalidatingTxnId = tn.getKey();
-              currentInvalidatingTxnTime = tn.getValue();
-            }
-          }
-          if (tableTxnList.getMinOpenWriteId() != null) {
-            // Invalid transaction list is sorted
-            int pos = 0;
-            for (Entry<Long, Long> t : tableModifications.get(qNameTableUsed)
-                .subMap(tableTxnList.getMinOpenWriteId(), 
tableTxnList.getHighWatermark()).entrySet()) {
-              while (pos < tableTxnList.getInvalidWriteIds().length &&
-                  tableTxnList.getInvalidWriteIds()[pos] != t.getKey()) {
-                pos++;
-              }
-              if (pos >= tableTxnList.getInvalidWriteIds().length) {
-                break;
-              }
-              if (currentInvalidatingTxnTime == 0L ||
-                  t.getValue() < currentInvalidatingTxnTime) {
-                // This transaction 1) is the first one examined for this 
materialization, or
-                // 2) it is the invalidating transaction. Hence we add it to 
the transactions to keep.
-                // 1.- We remove the previous invalidating transaction from 
the transactions
-                // to be kept (if needed).
-                if (canBeDeleted && currentInvalidatingTxnTime < minTime) {
-                  keepTxnInfos.remove(currentTableForInvalidatingTxn, 
currentInvalidatingTxnId);
-                }
-                // 2.- We add this transaction to the transactions that should 
be kept.
-                canBeDeleted = 
!keepTxnInfos.get(qNameTableUsed).contains(t.getKey());
-                keepTxnInfos.put(qNameTableUsed, t.getKey());
-                // 3.- We record this transaction as the current invalidating 
transaction.
-                currentTableForInvalidatingTxn = qNameTableUsed;
-                currentInvalidatingTxnId = t.getKey();
-                currentInvalidatingTxnTime = t.getValue();
-              }
-            }
-          }
-        }
-      }
-    }
-    // Second, we remove the transactions
-    long removed = 0L;
-    for (Entry<String, ConcurrentSkipListMap<Long, Long>> e : 
tableModifications.entrySet()) {
-      Collection<Long> c = keepTxnInfos.get(e.getKey());
-      ConcurrentSkipListSet<Long> updateDeleteForTable = 
updateDeleteTableModifications.get(e.getKey());
-      for (Iterator<Entry<Long, Long>> it = 
e.getValue().entrySet().iterator(); it.hasNext();) {
-        Entry<Long, Long> v = it.next();
-        // We need to check again the time because some of the transactions 
might not be explored
-        // above, e.g., transactions above the highest transaction mark for 
all the materialized
-        // views.
-        if (v.getValue() < minTime && (c.isEmpty() || 
!c.contains(v.getKey()))) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Transaction removed from cache for table {} -> id: {}, 
time: {}",
-                e.getKey(), v.getKey(), v.getValue());
-          }
-          if (updateDeleteForTable != null) {
-            updateDeleteForTable.remove(v.getKey());
-          }
-          it.remove();
-          removed++;
-        }
-      }
-    }
-    return removed;
-  }
-
-  /**
-   * Checks whether the given materialization exists in the invalidation cache.
-   * @param dbName the database name for the materialization
-   * @param tblName the table name for the materialization
-   * @return true if we have information about the materialization in the 
cache,
-   * false otherwise
-   */
-  public boolean containsMaterialization(String dbName, String tblName) {
-    if (disable || dbName == null || tblName == null) {
-      return false;
-    }
-    ConcurrentMap<String, Materialization> dbMaterializations = 
materializations.get(dbName);
-    if (dbMaterializations == null || dbMaterializations.get(tblName) == null) 
{
-      // This is a table
-      return false;
-    }
-    return true;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
index 8ca9ede..9ce7d6d 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
@@ -18,7 +18,10 @@
 package org.apache.hadoop.hive.metastore;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,6 +35,7 @@ public class MaterializationsRebuildLockCleanerTask 
implements MetastoreTaskThre
   private static final Logger LOG = 
LoggerFactory.getLogger(MaterializationsRebuildLockCleanerTask.class);
 
   private Configuration conf;
+  private TxnStore txnHandler;
 
   @Override
   public long runFrequency(TimeUnit unit) {
@@ -41,6 +45,7 @@ public class MaterializationsRebuildLockCleanerTask 
implements MetastoreTaskThre
   @Override
   public void setConf(Configuration configuration) {
     conf = configuration;
+    txnHandler = TxnUtils.getTxnStore(conf);
   }
 
   @Override
@@ -50,11 +55,26 @@ public class MaterializationsRebuildLockCleanerTask 
implements MetastoreTaskThre
 
   @Override
   public void run() {
-    long removedCnt = 
MaterializationsRebuildLockHandler.get().cleanupResourceLocks(
-        MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, 
TimeUnit.MILLISECONDS));
-    if (removedCnt > 0) {
-      if (LOG.isDebugEnabled()) {
-        LOG.info("Number of materialization locks deleted: " + removedCnt);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Cleaning up materialization rebuild locks");
+    }
+
+    TxnStore.MutexAPI.LockHandle handle = null;
+    try {
+      handle = 
txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.MaterializationRebuild.name());
+      ValidTxnList validTxnList = 
TxnUtils.createValidReadTxnList(txnHandler.getOpenTxns(), 0);
+      long removedCnt = 
txnHandler.cleanupMaterializationRebuildLocks(validTxnList,
+          MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, 
TimeUnit.MILLISECONDS));
+      if (removedCnt > 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Number of materialization locks deleted: " + removedCnt);
+        }
+      }
+    } catch(Throwable t) {
+      LOG.error("Serious error in {}", Thread.currentThread().getName(), ": 
{}" + t.getMessage(), t);
+    } finally {
+      if(handle != null) {
+        handle.releaseLocks();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 3e186b7..fdadf12 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -1327,13 +1327,6 @@ public class ObjectStore implements RawStore, 
Configurable {
     } finally {
       if (!commited) {
         rollbackTransaction();
-      } else {
-        if (MetaStoreUtils.isMaterializedViewTable(tbl)) {
-          // Add to the invalidation cache
-          MaterializationsInvalidationCache.get().createMaterializedView(
-              tbl.getDbName(), tbl.getTableName(), 
tbl.getCreationMetadata().getTablesUsed(),
-              tbl.getCreationMetadata().getValidTxnList());
-        }
       }
     }
   }
@@ -1431,10 +1424,6 @@ public class ObjectStore implements RawStore, 
Configurable {
     } finally {
       if (!success) {
         rollbackTransaction();
-      } else {
-        if (materializedView) {
-          MaterializationsInvalidationCache.get().dropMaterializedView(dbName, 
tableName);
-        }
       }
     }
     return success;
@@ -2278,13 +2267,14 @@ public class ObjectStore implements RawStore, 
Configurable {
     if (m == null) {
       return null;
     }
+    assert !m.isSetMaterializationTime();
     Set<MTable> tablesUsed = new HashSet<>();
     for (String fullyQualifiedName : m.getTablesUsed()) {
       String[] names =  fullyQualifiedName.split("\\.");
       tablesUsed.add(getMTable(m.getCatName(), names[0], names[1], 
false).mtbl);
     }
     return new MCreationMetadata(m.getCatName(), m.getDbName(), m.getTblName(),
-        tablesUsed, m.getValidTxnList());
+        tablesUsed, m.getValidTxnList(), System.currentTimeMillis());
   }
 
   private CreationMetadata convertToCreationMetadata(
@@ -2300,6 +2290,7 @@ public class ObjectStore implements RawStore, 
Configurable {
     }
     CreationMetadata r = new CreationMetadata(s.getCatalogName(),
         s.getDbName(), s.getTblName(), tablesUsed);
+    r.setMaterializationTime(s.getMaterializationTime());
     if (s.getTxnList() != null) {
       r.setValidTxnList(s.getTxnList());
     }
@@ -4075,16 +4066,13 @@ public class ObjectStore implements RawStore, 
Configurable {
       MCreationMetadata newMcm = convertToMCreationMetadata(cm);
       MCreationMetadata mcm = getCreationMetadata(catName, dbname, tablename);
       mcm.setTables(newMcm.getTables());
+      mcm.setMaterializationTime(newMcm.getMaterializationTime());
       mcm.setTxnList(newMcm.getTxnList());
       // commit the changes
       success = commitTransaction();
     } finally {
       if (!success) {
         rollbackTransaction();
-      } else {
-        // Add to the invalidation cache if the creation signature has changed
-        MaterializationsInvalidationCache.get().alterMaterializedView(
-            dbname, tablename, cm.getTablesUsed(), cm.getValidTxnList());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 99d38e2..d5f9721 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.DefaultStorageSchemaReader;
 import org.apache.hadoop.hive.metastore.HiveAlterHandler;
-import org.apache.hadoop.hive.metastore.MaterializationsCacheCleanerTask;
 import org.apache.hadoop.hive.metastore.MaterializationsRebuildLockCleanerTask;
 import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
 import org.apache.hadoop.hive.metastore.RuntimeStatsCleanerTask;
@@ -760,7 +759,6 @@ public class MetastoreConf {
     TASK_THREADS_ALWAYS("metastore.task.threads.always", 
"metastore.task.threads.always",
         EventCleanerTask.class.getName() + "," + 
RuntimeStatsCleanerTask.class.getName() + "," +
         "org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask" + "," +
-        MaterializationsCacheCleanerTask.class.getName() + "," +
             MaterializationsRebuildLockCleanerTask.class.getName() + "," + 
RuntimeStatsCleanerTask.class.getName(),
         "Comma separated list of tasks that will be started in separate 
threads.  These will " +
             "always be started, regardless of whether the metastore is running 
in embedded mode " +

http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java
index 66b5d48..2d65126 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java
@@ -22,8 +22,8 @@ import java.util.Set;
 /**
  * Represents the creation metadata of a materialization.
  * It includes the database and table name for the materialization,
- * the set of tables that it uses, and the valid transaction list
- * when it was created.
+ * the set of tables that it uses, the valid transaction list
+ * when it was created, and the creation/rebuild time.
  */
 public class MCreationMetadata {
 
@@ -32,17 +32,19 @@ public class MCreationMetadata {
   private String tblName;
   private Set<MTable> tables;
   private String txnList;
+  private long materializationTime;
 
   public MCreationMetadata() {
   }
 
   public MCreationMetadata(String catName, String dbName, String tblName,
-      Set<MTable> tables, String txnList) {
+      Set<MTable> tables, String txnList, long materializationTime) {
     this.catalogName = catName;
     this.dbName = dbName;
     this.tblName = tblName;
     this.tables = tables;
     this.txnList = txnList;
+    this.materializationTime = materializationTime;
   }
 
   public Set<MTable> getTables() {
@@ -84,4 +86,12 @@ public class MCreationMetadata {
   public void setTblName(String tblName) {
     this.tblName = tblName;
   }
+
+  public long getMaterializationTime() {
+    return materializationTime;
+  }
+
+  public void setMaterializationTime(long materializationTime) {
+    this.materializationTime = materializationTime;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 50bfca3..8764c21 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -94,9 +94,9 @@ public final class TxnDbUtil {
           "  CTC_DATABASE varchar(128) NOT NULL," +
           "  CTC_TABLE varchar(128)," +
           "  CTC_PARTITION varchar(767)," +
-          "  CTC_ID bigint GENERATED ALWAYS AS IDENTITY (START WITH 1, 
INCREMENT BY 1) NOT NULL," +
           "  CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL," +
-          "  CTC_WRITEID bigint)");
+          "  CTC_WRITEID bigint," +
+          "  CTC_UPDATE_DELETE char(1) NOT NULL)");
       stmt.execute("CREATE TABLE NEXT_TXN_ID (" + "  NTXN_NEXT bigint NOT 
NULL)");
       stmt.execute("INSERT INTO NEXT_TXN_ID VALUES(1)");
 
@@ -194,6 +194,14 @@ public final class TxnDbUtil {
           " PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID))"
       );
 
+      stmt.execute("CREATE TABLE MATERIALIZATION_REBUILD_LOCKS (" +
+          "  MRL_TXN_ID BIGINT NOT NULL, " +
+          "  MRL_DB_NAME VARCHAR(128) NOT NULL, " +
+          "  MRL_TBL_NAME VARCHAR(256) NOT NULL, " +
+          "  MRL_LAST_HEARTBEAT BIGINT NOT NULL, " +
+          "  PRIMARY KEY(MRL_TXN_ID))"
+      );
+
       try {
         stmt.execute("CREATE TABLE \"APP\".\"SEQUENCE_TABLE\" 
(\"SEQUENCE_NAME\" VARCHAR(256) NOT " +
 

http://git-wip-us.apache.org/repos/asf/hive/blob/774a8ef7/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index e2a2a39..b2a22f1 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -26,10 +26,10 @@ import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Savepoint;
 import java.sql.Statement;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
-import java.util.Calendar;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -40,7 +40,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.SortedSet;
-import java.util.TimeZone;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
@@ -64,11 +63,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.common.classification.RetrySemantics;
 import org.apache.hadoop.hive.metastore.DatabaseProduct;
-import org.apache.hadoop.hive.metastore.MaterializationsInvalidationCache;
-import org.apache.hadoop.hive.metastore.MaterializationsRebuildLockHandler;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
 import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener;
@@ -842,10 +840,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
   @RetrySemantics.Idempotent("No-op if already committed")
   public void commitTxn(CommitTxnRequest rqst)
     throws NoSuchTxnException, TxnAbortedException, MetaException {
-    MaterializationsRebuildLockHandler materializationsRebuildLockHandler =
-        MaterializationsRebuildLockHandler.get();
-    List<TransactionRegistryInfo> txnComponents = new ArrayList<>();
-    boolean isUpdateDelete = false;
+    char isUpdateDelete = 'N';
     long txnid = rqst.getTxnid();
     long sourceTxnId = -1;
 
@@ -902,7 +897,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
           quoteChar(OpertaionType.UPDATE.sqlConst) + "," + 
quoteChar(OpertaionType.DELETE.sqlConst) + ")";
         rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, 
"tc_operation_type " + conflictSQLSuffix));
         if (rs.next()) {
-          isUpdateDelete = true;
+          isUpdateDelete = 'Y';
           close(rs);
           //if here it means currently committing txn performed update/delete 
and we should check WW conflict
           /**
@@ -995,8 +990,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         // Move the record from txn_components into completed_txn_components 
so that the compactor
         // knows where to look to compact.
         String s = "insert into COMPLETED_TXN_COMPONENTS (ctc_txnid, 
ctc_database, " +
-            "ctc_table, ctc_partition, ctc_writeid) select tc_txnid, 
tc_database, tc_table, " +
-            "tc_partition, tc_writeid from TXN_COMPONENTS where tc_txnid = " + 
txnid;
+            "ctc_table, ctc_partition, ctc_writeid, ctc_update_delete) select 
tc_txnid, tc_database, tc_table, " +
+            "tc_partition, tc_writeid, '" + isUpdateDelete + "' from 
TXN_COMPONENTS where tc_txnid = " + txnid;
         LOG.debug("Going to execute insert <" + s + ">");
         int modCount = 0;
         if ((modCount = stmt.executeUpdate(s)) < 1) {
@@ -1005,15 +1000,6 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
           LOG.info("Expected to move at least one record from txn_components 
to " +
             "completed_txn_components when committing txn! " + 
JavaUtils.txnIdToString(txnid));
         }
-        // Obtain information that we need to update registry
-        s = "select ctc_database, ctc_table, ctc_writeid, ctc_timestamp from 
COMPLETED_TXN_COMPONENTS where ctc_txnid = " + txnid;
-        LOG.debug("Going to extract table modification information for 
invalidation cache <" + s + ">");
-        rs = stmt.executeQuery(s);
-        while (rs.next()) {
-          // We only enter in this loop if the transaction actually affected 
any table
-          txnComponents.add(new TransactionRegistryInfo(rs.getString(1), 
rs.getString(2),
-              rs.getLong(3), rs.getTimestamp(4, 
Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime()));
-        }
         s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid;
         LOG.debug("Going to execute update <" + s + ">");
         modCount = stmt.executeUpdate(s);
@@ -1028,6 +1014,10 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         modCount = stmt.executeUpdate(s);
         LOG.info("Removed committed transaction: (" + txnid + ") from 
MIN_HISTORY_LEVEL");
 
+        s = "delete from MATERIALIZATION_REBUILD_LOCKS where mrl_txn_id = " + 
txnid;
+        LOG.debug("Going to execute update <" + s + ">");
+        modCount = stmt.executeUpdate(s);
+
         if (rqst.isSetReplPolicy()) {
           s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId 
+
                   " and RTM_REPL_POLICY = " + 
quoteString(rqst.getReplPolicy());
@@ -1040,24 +1030,9 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
                   EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, 
null), dbConn, sqlGenerator);
         }
 
-        MaterializationsInvalidationCache materializationsInvalidationCache =
-            MaterializationsInvalidationCache.get();
-        for (TransactionRegistryInfo info : txnComponents) {
-          if 
(materializationsInvalidationCache.containsMaterialization(info.dbName, 
info.tblName) &&
-              
!materializationsRebuildLockHandler.readyToCommitResource(info.dbName, 
info.tblName, txnid)) {
-            throw new MetaException(
-                "Another process is rebuilding the materialized view " + 
info.fullyQualifiedName);
-          }
-        }
         LOG.debug("Going to commit");
         close(rs);
         dbConn.commit();
-
-        // Update registry with modifications
-        for (TransactionRegistryInfo info : txnComponents) {
-          materializationsInvalidationCache.notifyTableModification(
-              info.dbName, info.tblName, info.writeId, info.timestamp, 
isUpdateDelete);
-        }
       } catch (SQLException e) {
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
@@ -1068,9 +1043,6 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         close(commitIdRs);
         close(lockHandle, stmt, dbConn);
         unlockInternal();
-        for (TransactionRegistryInfo info : txnComponents) {
-          materializationsRebuildLockHandler.unlockResource(info.dbName, 
info.tblName, txnid);
-        }
       }
     } catch (RetryException e) {
       commitTxn(rqst);
@@ -1600,16 +1572,30 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
   }
 
   /**
-   * Gets the information of the first transaction for the given table
-   * after the transaction with the input id was committed (if any). 
+   * Get invalidation info for the materialization. Currently, the 
materialization information
+   * only contains information about whether there was update/delete 
operations on the source
+   * tables used by the materialization since it was created.
    */
   @Override
   @RetrySemantics.ReadOnly
-  public BasicTxnInfo getFirstCompletedTransactionForTableAfterCommit(
-      String inputDbName, String inputTableName, ValidWriteIdList txnList)
-          throws MetaException {
-    final List<Long> openTxns = 
Arrays.asList(ArrayUtils.toObject(txnList.getInvalidWriteIds()));
+  public Materialization getMaterializationInvalidationInfo(
+      CreationMetadata creationMetadata, String validTxnListStr) throws 
MetaException {
+    if (creationMetadata.getTablesUsed().isEmpty()) {
+      // Bail out
+      LOG.warn("Materialization creation metadata does not contain any table");
+      return null;
+    }
+
+    // Parse validTxnList
+    final ValidReadTxnList validTxnList =
+        new ValidReadTxnList(validTxnListStr);
 
+    // Parse validReaderWriteIdList from creation metadata
+    final ValidTxnWriteIdList validReaderWriteIdList =
+        new ValidTxnWriteIdList(creationMetadata.getValidTxnList());
+
+    // We are composing a query that returns a single row if an update 
happened after
+    // the materialization was created. Otherwise, query returns 0 rows.
     Connection dbConn = null;
     Statement stmt = null;
     ResultSet rs = null;
@@ -1617,32 +1603,207 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
       stmt = dbConn.createStatement();
       stmt.setMaxRows(1);
-      String s = "select ctc_timestamp, ctc_writeid, ctc_database, ctc_table "
-          + "from COMPLETED_TXN_COMPONENTS "
-          + "where ctc_database=" + quoteString(inputDbName) + " and 
ctc_table=" + quoteString(inputTableName)
-          + " and ctc_writeid > " + txnList.getHighWatermark()
-          + (txnList.getInvalidWriteIds().length == 0 ?
-              " " : " or ctc_writeid IN(" + StringUtils.join(",", openTxns) + 
") ")
-          + "order by ctc_timestamp asc";
+      StringBuilder query = new StringBuilder();
+      // compose a query that select transactions containing an update...
+      query.append("select ctc_update_delete from COMPLETED_TXN_COMPONENTS 
where ctc_update_delete='Y' AND (");
+      int i = 0;
+      for (String fullyQualifiedName : creationMetadata.getTablesUsed()) {
+        // ...for each of the tables that are part of the materialized view,
+        // where the transaction had to be committed after the materialization 
was created...
+        if (i != 0) {
+          query.append("OR");
+        }
+        String[] names = TxnUtils.getDbTableName(fullyQualifiedName);
+        query.append(" (ctc_database=" + quoteString(names[0]) + " AND 
ctc_table=" + quoteString(names[1]));
+        ValidWriteIdList tblValidWriteIdList =
+            
validReaderWriteIdList.getTableValidWriteIdList(fullyQualifiedName);
+        if (tblValidWriteIdList == null) {
+          LOG.warn("ValidWriteIdList for table {} not present in creation 
metadata, this should not happen");
+          return null;
+        }
+        query.append(" AND (ctc_writeid > " + 
tblValidWriteIdList.getHighWatermark());
+        query.append(tblValidWriteIdList.getInvalidWriteIds().length == 0 ? ") 
" :
+            " OR ctc_writeid IN(" + StringUtils.join(",",
+                
Arrays.asList(ArrayUtils.toObject(tblValidWriteIdList.getInvalidWriteIds()))) + 
") ");
+        query.append(") ");
+        i++;
+      }
+      // ... and where the transaction has already been committed as per 
snapshot taken
+      // when we are running current query
+      query.append(") AND ctc_txnid <= " + validTxnList.getHighWatermark());
+      query.append(validTxnList.getInvalidTransactions().length == 0 ? " " :
+          " AND ctc_txnid NOT IN(" + StringUtils.join(",",
+              
Arrays.asList(ArrayUtils.toObject(validTxnList.getInvalidTransactions()))) + ") 
");
+
+      // Execute query
+      String s = query.toString();
       if (LOG.isDebugEnabled()) {
         LOG.debug("Going to execute query <" + s + ">");
       }
       rs = stmt.executeQuery(s);
 
-      if(!rs.next()) {
-        return new BasicTxnInfo(true);
+      return new Materialization(rs.next());
+    } catch (SQLException ex) {
+      LOG.warn("getMaterializationInvalidationInfo failed due to " + 
getMessage(ex), ex);
+      throw new MetaException("Unable to retrieve materialization invalidation 
information due to " +
+          StringUtils.stringifyException(ex));
+    } finally {
+      close(rs, stmt, dbConn);
+    }
+  }
+
+  @Override
+  public LockResponse lockMaterializationRebuild(String dbName, String 
tableName, long txnId)
+      throws MetaException {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Acquiring lock for materialization rebuild with txnId={} for 
{}", txnId, Warehouse.getQualifiedName(dbName,tableName));
+    }
+
+    TxnStore.MutexAPI.LockHandle handle = null;
+    Connection dbConn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      lockInternal();
+      /**
+       * MUTEX_KEY.MaterializationRebuild lock ensures that there is only 1 
entry in
+       * Initiated/Working state for any resource. This ensures we do not run 
concurrent
+       * rebuild operations on any materialization.
+       */
+      handle = 
getMutexAPI().acquireLock(MUTEX_KEY.MaterializationRebuild.name());
+      dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+      stmt = dbConn.createStatement();
+
+      String selectQ = "select mrl_txn_id from MATERIALIZATION_REBUILD_LOCKS 
where" +
+          " mrl_db_name =" + quoteString(dbName) +
+          " AND mrl_tbl_name=" + quoteString(tableName);
+      LOG.debug("Going to execute query <" + selectQ + ">");
+      rs = stmt.executeQuery(selectQ);
+      if(rs.next()) {
+        LOG.info("Ignoring request to rebuild " + dbName + "/" + tableName +
+            " since it is already being rebuilt");
+        return new LockResponse(txnId, LockState.NOT_ACQUIRED);
       }
-      final BasicTxnInfo txnInfo = new BasicTxnInfo(false);
-      txnInfo.setTime(rs.getTimestamp(1, 
Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime());
-      txnInfo.setTxnid(rs.getLong(2));
-      txnInfo.setDbname(rs.getString(3));
-      txnInfo.setTablename(rs.getString(4));
-      return txnInfo;
+      String insertQ = "insert into MATERIALIZATION_REBUILD_LOCKS " +
+          "(mrl_txn_id, mrl_db_name, mrl_tbl_name, mrl_last_heartbeat) values 
(" + txnId +
+          ", '" + dbName + "', '" + tableName + "', " + 
Instant.now().toEpochMilli() + ")";
+      LOG.debug("Going to execute update <" + insertQ + ">");
+      stmt.executeUpdate(insertQ);
+      LOG.debug("Going to commit");
+      dbConn.commit();
+      return new LockResponse(txnId, LockState.ACQUIRED);
     } catch (SQLException ex) {
-      LOG.warn("getLastCompletedTransactionForTable failed due to " + 
getMessage(ex), ex);
-      throw new MetaException("Unable to retrieve commits information due to " 
+ StringUtils.stringifyException(ex));
+      LOG.warn("lockMaterializationRebuild failed due to " + getMessage(ex), 
ex);
+      throw new MetaException("Unable to retrieve materialization invalidation 
information due to " +
+          StringUtils.stringifyException(ex));
     } finally {
       close(rs, stmt, dbConn);
+      if(handle != null) {
+        handle.releaseLocks();
+      }
+      unlockInternal();
+    }
+  }
+
+  @Override
+  public boolean heartbeatLockMaterializationRebuild(String dbName, String 
tableName, long txnId)
+      throws MetaException {
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      ResultSet rs = null;
+      try {
+        lockInternal();
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        String s = "update MATERIALIZATION_REBUILD_LOCKS" +
+            " set mrl_last_heartbeat = " + Instant.now().toEpochMilli() +
+            " where mrl_txn_id = " + txnId +
+            " AND mrl_db_name =" + quoteString(dbName) +
+            " AND mrl_tbl_name=" + quoteString(tableName);
+        LOG.debug("Going to execute update <" + s + ">");
+        int rc = stmt.executeUpdate(s);
+        if (rc < 1) {
+          LOG.debug("Going to rollback");
+          dbConn.rollback();
+          LOG.info("No lock found for rebuild of " + 
Warehouse.getQualifiedName(dbName, tableName) +
+              " when trying to heartbeat");
+          // It could not be renewed, return that information
+          return false;
+        }
+        LOG.debug("Going to commit");
+        dbConn.commit();
+        // It could be renewed, return that information
+        return true;
+      } catch (SQLException e) {
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e,
+            "heartbeatLockMaterializationRebuild(" + 
Warehouse.getQualifiedName(dbName, tableName) + ", " + txnId + ")");
+        throw new MetaException("Unable to heartbeat rebuild lock due to " +
+            StringUtils.stringifyException(e));
+      } finally {
+        close(rs, stmt, dbConn);
+        unlockInternal();
+      }
+    } catch (RetryException e) {
+      return heartbeatLockMaterializationRebuild(dbName, tableName ,txnId);
+    }
+  }
+
+  @Override
+  public long cleanupMaterializationRebuildLocks(ValidTxnList validTxnList, 
long timeout) throws MetaException {
+    try {
+      // Aux values
+      long cnt = 0L;
+      List<Long> txnIds = new ArrayList<>();
+      long timeoutTime = Instant.now().toEpochMilli() - timeout;
+
+      Connection dbConn = null;
+      Statement stmt = null;
+      ResultSet rs = null;
+      try {
+        lockInternal();
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+
+        String selectQ = "select mrl_txn_id, mrl_last_heartbeat from 
MATERIALIZATION_REBUILD_LOCKS";
+        LOG.debug("Going to execute query <" + selectQ + ">");
+        rs = stmt.executeQuery(selectQ);
+        while(rs.next()) {
+          long lastHeartbeat = rs.getLong(2);
+          if (lastHeartbeat < timeoutTime) {
+            // The heartbeat has timeout, double check whether we can remove it
+            long txnId = rs.getLong(1);
+            if (validTxnList.isTxnValid(txnId) || 
validTxnList.isTxnAborted(txnId)) {
+              // Txn was committed (but notification was not received) or it 
was aborted.
+              // Either case, we can clean it up
+              txnIds.add(txnId);
+            }
+          }
+        }
+        if (!txnIds.isEmpty()) {
+          String deleteQ = "delete from MATERIALIZATION_REBUILD_LOCKS where" +
+              " mrl_txn_id IN(" + StringUtils.join(",", txnIds) + ") ";
+          LOG.debug("Going to execute update <" + deleteQ + ">");
+          cnt = stmt.executeUpdate(deleteQ);
+        }
+        LOG.debug("Going to commit");
+        dbConn.commit();
+        return cnt;
+      } catch (SQLException e) {
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "cleanupMaterializationRebuildLocks");
+        throw new MetaException("Unable to clean rebuild locks due to " +
+            StringUtils.stringifyException(e));
+      } finally {
+        close(rs, stmt, dbConn);
+        unlockInternal();
+      }
+    } catch (RetryException e) {
+      return cleanupMaterializationRebuildLocks(validTxnList, timeout);
     }
   }
 
@@ -1915,6 +2076,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
   private static String normalizeCase(String s) {
     return s == null ? null : s.toLowerCase();
   }
+
   private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, 
long txnId)
     throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, 
MetaException {
     try {
@@ -4802,20 +4964,4 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     }
   };
 
-  private class TransactionRegistryInfo {
-    final String dbName;
-    final String tblName;
-    final String fullyQualifiedName;
-    final long writeId;
-    final long timestamp;
-
-    public TransactionRegistryInfo (String dbName, String tblName, long 
writeId, long timestamp) {
-      this.dbName = dbName;
-      this.tblName = tblName;
-      this.fullyQualifiedName = Warehouse.getQualifiedName(dbName, tblName);
-      this.writeId = writeId;
-      this.timestamp = timestamp;
-    }
-  }
-
 }

Reply via email to