Repository: hive Updated Branches: refs/heads/master 4a33ec8fc -> 0a328f030
http://git-wip-us.apache.org/repos/asf/hive/blob/0a328f03/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb index 3a11a05..bc58cfe 100644 --- a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -827,7 +827,7 @@ class Table PRIVILEGES => {:type => ::Thrift::Types::STRUCT, :name => 'privileges', :class => ::PrincipalPrivilegeSet, :optional => true}, TEMPORARY => {:type => ::Thrift::Types::BOOL, :name => 'temporary', :default => false, :optional => true}, REWRITEENABLED => {:type => ::Thrift::Types::BOOL, :name => 'rewriteEnabled', :optional => true}, - CREATIONMETADATA => {:type => ::Thrift::Types::MAP, :name => 'creationMetadata', :key => {:type => ::Thrift::Types::STRING}, :value => {:type => ::Thrift::Types::STRUCT, :class => ::BasicTxnInfo}, :optional => true} + CREATIONMETADATA => {:type => ::Thrift::Types::STRUCT, :name => 'creationMetadata', :class => ::CreationMetadata, :optional => true} } def struct_fields; FIELDS; end @@ -2673,16 +2673,14 @@ end class BasicTxnInfo include ::Thrift::Struct, ::Thrift::Struct_Union ISNULL = 1 - ID = 2 - TIME = 3 - TXNID = 4 - DBNAME = 5 - TABLENAME = 6 - PARTITIONNAME = 7 + TIME = 2 + TXNID = 3 + DBNAME = 4 + TABLENAME = 5 + PARTITIONNAME = 6 FIELDS = { ISNULL => {:type => ::Thrift::Types::BOOL, :name => 'isnull'}, - ID => {:type => ::Thrift::Types::I64, :name => 'id', :optional => true}, TIME => {:type => ::Thrift::Types::I64, :name => 'time', :optional => true}, TXNID => {:type => ::Thrift::Types::I64, :name => 'txnid', :optional => true}, DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbname', :optional => true}, @@ -2699,21 +2697,26 @@ class BasicTxnInfo ::Thrift::Struct.generate_accessors self end -class TxnsSnapshot +class CreationMetadata include ::Thrift::Struct, ::Thrift::Struct_Union - TXN_HIGH_WATER_MARK = 1 - OPEN_TXNS = 2 + DBNAME = 1 + TBLNAME = 2 + TABLESUSED = 3 + VALIDTXNLIST = 4 FIELDS = { - TXN_HIGH_WATER_MARK => {:type => ::Thrift::Types::I64, :name => 'txn_high_water_mark'}, - OPEN_TXNS => {:type => ::Thrift::Types::LIST, :name => 'open_txns', :element => {:type => ::Thrift::Types::I64}} + DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'}, + TBLNAME => {:type => ::Thrift::Types::STRING, :name => 'tblName'}, + TABLESUSED => {:type => ::Thrift::Types::SET, :name => 'tablesUsed', :element => {:type => ::Thrift::Types::STRING}}, + VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 'validTxnList', :optional => true} } def struct_fields; FIELDS; end def validate - raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field txn_high_water_mark is unset!') unless @txn_high_water_mark - raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field open_txns is unset!') unless @open_txns + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field dbName is unset!') unless @dbName + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field tblName is unset!') unless @tblName + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field tablesUsed is unset!') unless @tablesUsed end ::Thrift::Struct.generate_accessors self http://git-wip-us.apache.org/repos/asf/hive/blob/0a328f03/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb index a788c08..ec88131 100644 --- a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb +++ b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb @@ -2546,36 +2546,6 @@ module ThriftHiveMetastore return end - def get_last_completed_transaction_for_tables(db_names, table_names, txns_snapshot) - send_get_last_completed_transaction_for_tables(db_names, table_names, txns_snapshot) - return recv_get_last_completed_transaction_for_tables() - end - - def send_get_last_completed_transaction_for_tables(db_names, table_names, txns_snapshot) - send_message('get_last_completed_transaction_for_tables', Get_last_completed_transaction_for_tables_args, :db_names => db_names, :table_names => table_names, :txns_snapshot => txns_snapshot) - end - - def recv_get_last_completed_transaction_for_tables() - result = receive_message(Get_last_completed_transaction_for_tables_result) - return result.success unless result.success.nil? - raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_last_completed_transaction_for_tables failed: unknown result') - end - - def get_last_completed_transaction_for_table(db_name, table_name, txns_snapshot) - send_get_last_completed_transaction_for_table(db_name, table_name, txns_snapshot) - return recv_get_last_completed_transaction_for_table() - end - - def send_get_last_completed_transaction_for_table(db_name, table_name, txns_snapshot) - send_message('get_last_completed_transaction_for_table', Get_last_completed_transaction_for_table_args, :db_name => db_name, :table_name => table_name, :txns_snapshot => txns_snapshot) - end - - def recv_get_last_completed_transaction_for_table() - result = receive_message(Get_last_completed_transaction_for_table_result) - return result.success unless result.success.nil? - raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_last_completed_transaction_for_table failed: unknown result') - end - def get_next_notification(rqst) send_get_next_notification(rqst) return recv_get_next_notification() @@ -4986,20 +4956,6 @@ module ThriftHiveMetastore write_result(result, oprot, 'add_dynamic_partitions', seqid) end - def process_get_last_completed_transaction_for_tables(seqid, iprot, oprot) - args = read_args(iprot, Get_last_completed_transaction_for_tables_args) - result = Get_last_completed_transaction_for_tables_result.new() - result.success = @handler.get_last_completed_transaction_for_tables(args.db_names, args.table_names, args.txns_snapshot) - write_result(result, oprot, 'get_last_completed_transaction_for_tables', seqid) - end - - def process_get_last_completed_transaction_for_table(seqid, iprot, oprot) - args = read_args(iprot, Get_last_completed_transaction_for_table_args) - result = Get_last_completed_transaction_for_table_result.new() - result.success = @handler.get_last_completed_transaction_for_table(args.db_name, args.table_name, args.txns_snapshot) - write_result(result, oprot, 'get_last_completed_transaction_for_table', seqid) - end - def process_get_next_notification(seqid, iprot, oprot) args = read_args(iprot, Get_next_notification_args) result = Get_next_notification_result.new() @@ -11088,78 +11044,6 @@ module ThriftHiveMetastore ::Thrift::Struct.generate_accessors self end - class Get_last_completed_transaction_for_tables_args - include ::Thrift::Struct, ::Thrift::Struct_Union - DB_NAMES = 1 - TABLE_NAMES = 2 - TXNS_SNAPSHOT = 3 - - FIELDS = { - DB_NAMES => {:type => ::Thrift::Types::LIST, :name => 'db_names', :element => {:type => ::Thrift::Types::STRING}}, - TABLE_NAMES => {:type => ::Thrift::Types::LIST, :name => 'table_names', :element => {:type => ::Thrift::Types::STRING}}, - TXNS_SNAPSHOT => {:type => ::Thrift::Types::STRUCT, :name => 'txns_snapshot', :class => ::TxnsSnapshot} - } - - def struct_fields; FIELDS; end - - def validate - end - - ::Thrift::Struct.generate_accessors self - end - - class Get_last_completed_transaction_for_tables_result - include ::Thrift::Struct, ::Thrift::Struct_Union - SUCCESS = 0 - - FIELDS = { - SUCCESS => {:type => ::Thrift::Types::LIST, :name => 'success', :element => {:type => ::Thrift::Types::STRUCT, :class => ::BasicTxnInfo}} - } - - def struct_fields; FIELDS; end - - def validate - end - - ::Thrift::Struct.generate_accessors self - end - - class Get_last_completed_transaction_for_table_args - include ::Thrift::Struct, ::Thrift::Struct_Union - DB_NAME = 1 - TABLE_NAME = 2 - TXNS_SNAPSHOT = 3 - - FIELDS = { - DB_NAME => {:type => ::Thrift::Types::STRING, :name => 'db_name'}, - TABLE_NAME => {:type => ::Thrift::Types::STRING, :name => 'table_name'}, - TXNS_SNAPSHOT => {:type => ::Thrift::Types::STRUCT, :name => 'txns_snapshot', :class => ::TxnsSnapshot} - } - - def struct_fields; FIELDS; end - - def validate - end - - ::Thrift::Struct.generate_accessors self - end - - class Get_last_completed_transaction_for_table_result - include ::Thrift::Struct, ::Thrift::Struct_Union - SUCCESS = 0 - - FIELDS = { - SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class => ::BasicTxnInfo} - } - - def struct_fields; FIELDS; end - - def validate - end - - ::Thrift::Struct.generate_accessors self - end - class Get_next_notification_args include ::Thrift::Struct, ::Thrift::Struct_Union RQST = 1 http://git-wip-us.apache.org/repos/asf/hive/blob/0a328f03/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index ecc4644..8dc9b6a 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -6951,19 +6951,6 @@ public class HiveMetaStore extends ThriftHiveMetastore { } @Override - public List<BasicTxnInfo> get_last_completed_transaction_for_tables( - List<String> dbNames, List<String> tableNames, TxnsSnapshot txnsSnapshot) - throws TException { - return getTxnHandler().getLastCompletedTransactionForTables(dbNames, tableNames, txnsSnapshot); - } - - @Override - public BasicTxnInfo get_last_completed_transaction_for_table(String dbName, String tableName, TxnsSnapshot txnsSnapshot) - throws TException { - return getTxnHandler().getLastCompletedTransactionForTable(dbName, tableName, txnsSnapshot); - } - - @Override public NotificationEventsCountResponse get_notification_events_count(NotificationEventsCountRequest rqst) throws TException { try { http://git-wip-us.apache.org/repos/asf/hive/blob/0a328f03/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index a3cb17b..2e76e17 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -2170,25 +2170,6 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { } @Override - public List<BasicTxnInfo> getLastCompletedTransactionForTables( - List<String> dbNames, List<String> tableNames, ValidTxnList txnList) - throws TException { - TxnsSnapshot txnsSnapshot = new TxnsSnapshot(); - txnsSnapshot.setTxn_high_water_mark(txnList.getHighWatermark()); - txnsSnapshot.setOpen_txns(Arrays.asList(ArrayUtils.toObject(txnList.getInvalidTransactions()))); - return client.get_last_completed_transaction_for_tables(dbNames, tableNames, txnsSnapshot); - } - - @Override - public BasicTxnInfo getLastCompletedTransactionForTable(String dbName, String tableName, ValidTxnList txnList) - throws TException { - TxnsSnapshot txnsSnapshot = new TxnsSnapshot(); - txnsSnapshot.setTxn_high_water_mark(txnList.getHighWatermark()); - txnsSnapshot.setOpen_txns(Arrays.asList(ArrayUtils.toObject(txnList.getInvalidTransactions()))); - return client.get_last_completed_transaction_for_table(dbName, tableName, txnsSnapshot); - } - - @Override public ValidTxnList getValidTxns() throws TException { return TxnUtils.createValidReadTxnList(client.get_open_txns(), 0); } http://git-wip-us.apache.org/repos/asf/hive/blob/0a328f03/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index 8ec8b3b..96d4590 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -1338,24 +1338,6 @@ public interface IMetaStoreClient { throws MetaException, TException; /** - * Get the last completed transaction for the given tables. Although transactions in Hive - * might happen concurrently, the order is based on the actual commit to the metastore - * table holding the completed transactions. - */ - @InterfaceAudience.LimitedPrivate({"HCatalog"}) - List<BasicTxnInfo> getLastCompletedTransactionForTables(List<String> dbNames, List<String> tableNames, ValidTxnList txnList) - throws TException; - - /** - * Get the last completed transaction for the given table. Although transactions in Hive - * might happen concurrently, the order is based on the actual commit to the metastore - * table holding the completed transactions. - */ - @InterfaceAudience.LimitedPrivate({"HCatalog"}) - BasicTxnInfo getLastCompletedTransactionForTable(String dbName, String tableName, ValidTxnList txnList) - throws TException; - - /** * Get a structure that details valid transactions. * @return list of valid transactions * @throws TException http://git-wip-us.apache.org/repos/asf/hive/blob/0a328f03/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java index de912d5..20e4e8d 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java @@ -17,15 +17,18 @@ */ package org.apache.hadoop.hive.metastore; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.metastore.api.BasicTxnInfo; import org.apache.hadoop.hive.metastore.api.Materialization; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -65,8 +68,8 @@ public final class MaterializationsInvalidationCache { * happen. This is useful to quickly check the invalidation time for a given materialized * view. */ - private final ConcurrentMap<String, ConcurrentSkipListSet<TableModificationKey>> tableModifications = - new ConcurrentHashMap<String, ConcurrentSkipListSet<TableModificationKey>>(); + private final ConcurrentMap<String, ConcurrentSkipListMap<Long, Long>> tableModifications = + new ConcurrentHashMap<String, ConcurrentSkipListMap<Long, Long>>(); /* Whether the cache has been initialized or not. */ private boolean initialized; @@ -112,7 +115,7 @@ public final class MaterializationsInvalidationCache { try { for (String dbName : store.getAllDatabases()) { for (Table mv : store.getTableObjectsByName(dbName, store.getTables(dbName, null, TableType.MATERIALIZED_VIEW))) { - addMaterializedView(mv, ImmutableSet.copyOf(mv.getCreationMetadata().keySet()), OpType.LOAD); + addMaterializedView(mv, ImmutableSet.copyOf(mv.getCreationMetadata().getTablesUsed()), OpType.LOAD); } } LOG.info("Initialized materializations invalidation cache"); @@ -160,53 +163,46 @@ public final class MaterializationsInvalidationCache { // Start the process to add materialization to the cache // Before loading the materialization in the cache, we need to update some // important information in the registry to account for rewriting invalidation - for (String qNameTableUsed : tablesUsed) { - // First we insert a new tree set to keep table modifications, unless it already exists - ConcurrentSkipListSet<TableModificationKey> modificationsTree = - new ConcurrentSkipListSet<TableModificationKey>(); - final ConcurrentSkipListSet<TableModificationKey> prevModificationsTree = tableModifications.putIfAbsent( - qNameTableUsed, modificationsTree); - if (prevModificationsTree != null) { - modificationsTree = prevModificationsTree; - } - // We obtain the access time to the table when the materialized view was created. - // This is a map from table fully qualified name to last modification before MV creation. - BasicTxnInfo e = materializedViewTable.getCreationMetadata().get(qNameTableUsed); - if (e.isIsnull()) { - // This can happen when the materialized view was created on non-transactional tables - // with rewrite disabled but then it was enabled by alter statement - continue; - } - final TableModificationKey lastModificationBeforeCreation = - new TableModificationKey(e.getId(), e.getTime()); - modificationsTree.add(lastModificationBeforeCreation); - if (opType == OpType.LOAD) { + String txnListString = materializedViewTable.getCreationMetadata().getValidTxnList(); + if (txnListString == null) { + // This can happen when the materialized view was created on non-transactional tables + return; + } + if (opType == OpType.CREATE || opType == OpType.ALTER) { + // You store the materialized view + cq.put(materializedViewTable.getTableName(), + new MaterializationInvalidationInfo(materializedViewTable, tablesUsed)); + } else { + ValidTxnList txnList = new ValidReadTxnList(txnListString); + for (String qNameTableUsed : tablesUsed) { + // First we insert a new tree set to keep table modifications, unless it already exists + ConcurrentSkipListMap<Long, Long> modificationsTree = + new ConcurrentSkipListMap<Long, Long>(); + final ConcurrentSkipListMap<Long, Long> prevModificationsTree = tableModifications.putIfAbsent( + qNameTableUsed, modificationsTree); + if (prevModificationsTree != null) { + modificationsTree = prevModificationsTree; + } // If we are not creating the MV at this instant, but instead it was created previously - // and we are loading it into the cache, we need to go through the transaction logs and + // and we are loading it into the cache, we need to go through the transaction entries and // check if the MV is still valid. try { String[] names = qNameTableUsed.split("\\."); - BasicTxnInfo e2 = txnStore.getFirstCompletedTransactionForTableAfterCommit( - names[0], names[1], lastModificationBeforeCreation.id); - if (!e2.isIsnull()) { - modificationsTree.add(new TableModificationKey(e2.getId(), e2.getTime())); + BasicTxnInfo e = txnStore.getFirstCompletedTransactionForTableAfterCommit( + names[0], names[1], txnList); + if (!e.isIsnull()) { + modificationsTree.put(e.getTxnid(), e.getTime()); // We do not need to do anything more for current table, as we detected // a modification event that was in the metastore. continue; } } catch (MetaException ex) { LOG.debug("Materialized view " + - Warehouse.getQualifiedName(materializedViewTable.getDbName(), materializedViewTable.getTableName()) + - " ignored; error loading view into invalidation cache", ex); + Warehouse.getQualifiedName(materializedViewTable.getDbName(), materializedViewTable.getTableName()) + + " ignored; error loading view into invalidation cache", ex); return; } } - } - if (opType == OpType.CREATE || opType == OpType.ALTER) { - // You store the materialized view - cq.put(materializedViewTable.getTableName(), - new MaterializationInvalidationInfo(materializedViewTable, tablesUsed)); - } else { // For LOAD, you only add it if it does exist as you might be loading an outdated MV cq.putIfAbsent(materializedViewTable.getTableName(), new MaterializationInvalidationInfo(materializedViewTable, tablesUsed)); @@ -218,23 +214,23 @@ public final class MaterializationsInvalidationCache { } /** - * This method is called when a table is modified. That way we can keep a track of the + * This method is called when a table is modified. That way we can keep track of the * invalidation for the MVs that use that table. */ public void notifyTableModification(String dbName, String tableName, - long eventId, long newModificationTime) { + long txnId, long newModificationTime) { if (LOG.isDebugEnabled()) { LOG.debug("Notification for table {} in database {} received -> id: {}, time: {}", - tableName, dbName, eventId, newModificationTime); + tableName, dbName, txnId, newModificationTime); } - ConcurrentSkipListSet<TableModificationKey> modificationsTree = - new ConcurrentSkipListSet<TableModificationKey>(); - final ConcurrentSkipListSet<TableModificationKey> prevModificationsTree = + ConcurrentSkipListMap<Long, Long> modificationsTree = + new ConcurrentSkipListMap<Long, Long>(); + final ConcurrentSkipListMap<Long, Long> prevModificationsTree = tableModifications.putIfAbsent(Warehouse.getQualifiedName(dbName, tableName), modificationsTree); if (prevModificationsTree != null) { modificationsTree = prevModificationsTree; } - modificationsTree.add(new TableModificationKey(eventId, newModificationTime)); + modificationsTree.put(txnId, newModificationTime); } /** @@ -296,69 +292,49 @@ public final class MaterializationsInvalidationCache { } private long getInvalidationTime(MaterializationInvalidationInfo materialization) { + String txnListString = materialization.getMaterializationTable().getCreationMetadata().getValidTxnList(); + if (txnListString == null) { + // This can happen when the materialization was created on non-transactional tables + return Long.MIN_VALUE; + } + + // We will obtain the modification time as follows. + // First, we obtain the first element after high watermark (if any) + // Then, we iterate through the elements from min open txn till high + // watermark, updating the modification time after creation if needed + ValidTxnList txnList = new ValidReadTxnList(txnListString); long firstModificationTimeAfterCreation = 0L; for (String qNameTableUsed : materialization.getTablesUsed()) { - BasicTxnInfo e = materialization.getMaterializationTable().getCreationMetadata().get(qNameTableUsed); - if (e == null) { - // This can happen when the materialized view was created on non-transactional tables - // with rewrite disabled but then it was enabled by alter statement - return Long.MIN_VALUE; - } - final TableModificationKey lastModificationBeforeCreation = - new TableModificationKey(e.getId(), e.getTime()); - final TableModificationKey post = tableModifications.get(qNameTableUsed) - .higher(lastModificationBeforeCreation); - if (post != null) { + final Long tn = tableModifications.get(qNameTableUsed) + .higherKey(txnList.getHighWatermark()); + if (tn != null) { if (firstModificationTimeAfterCreation == 0L || - post.time < firstModificationTimeAfterCreation) { - firstModificationTimeAfterCreation = post.time; + tn < firstModificationTimeAfterCreation) { + firstModificationTimeAfterCreation = tn; } } - } - return firstModificationTimeAfterCreation; - } - - private static class TableModificationKey implements Comparable<TableModificationKey> { - private long id; - private long time; - - private TableModificationKey(long id, long time) { - this.id = id; - this.time = time; - } - - @Override - public boolean equals(Object obj) { - if(this == obj) { - return true; - } - if((obj == null) || (obj.getClass() != this.getClass())) { - return false; - } - TableModificationKey tableModificationKey = (TableModificationKey) obj; - return id == tableModificationKey.id && time == tableModificationKey.time; - } - - @Override - public int hashCode() { - int hash = 7; - hash = 31 * hash + Long.hashCode(id); - hash = 31 * hash + Long.hashCode(time); - return hash; - } - - @Override - public int compareTo(TableModificationKey other) { - if (id == other.id) { - return Long.compare(time, other.time); + // Min open txn might be null if there were no open transactions + // when this transaction was being executed + if (txnList.getMinOpenTxn() != null) { + // Invalid transaction list is sorted + int pos = 0; + for (Map.Entry<Long, Long> t : tableModifications.get(qNameTableUsed) + .subMap(txnList.getMinOpenTxn(), txnList.getHighWatermark()).entrySet()) { + while (pos < txnList.getInvalidTransactions().length && + txnList.getInvalidTransactions()[pos] != t.getKey()) { + pos++; + } + if (pos >= txnList.getInvalidTransactions().length) { + break; + } + if (firstModificationTimeAfterCreation == 0L || + t.getValue() < firstModificationTimeAfterCreation) { + firstModificationTimeAfterCreation = t.getValue(); + } + } } - return Long.compare(id, other.id); - } - - @Override - public String toString() { - return "TableModificationKey{" + id + "," + time + "}"; } + return firstModificationTimeAfterCreation; } private enum OpType { http://git-wip-us.apache.org/repos/asf/hive/blob/0a328f03/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index b3d99a1..3d1c67f 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -68,7 +68,6 @@ import javax.jdo.identity.IntIdentity; import javax.sql.DataSource; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.classification.InterfaceAudience; @@ -80,10 +79,10 @@ import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.metastore.MetaStoreDirectSql.SqlFilterForPushdown; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.metastore.api.BasicTxnInfo; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.CreationMetadata; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -149,6 +148,7 @@ import org.apache.hadoop.hive.metastore.metrics.Metrics; import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; import org.apache.hadoop.hive.metastore.model.MColumnDescriptor; import org.apache.hadoop.hive.metastore.model.MConstraint; +import org.apache.hadoop.hive.metastore.model.MCreationMetadata; import org.apache.hadoop.hive.metastore.model.MDBPrivilege; import org.apache.hadoop.hive.metastore.model.MDatabase; import org.apache.hadoop.hive.metastore.model.MDelegationToken; @@ -193,10 +193,7 @@ import org.apache.hadoop.hive.metastore.utils.JavaUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.utils.ObjectPair; -import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; -import org.apache.thrift.TSerializer; -import org.apache.thrift.protocol.TJSONProtocol; import org.datanucleus.AbstractNucleusContext; import org.datanucleus.ClassLoaderResolver; import org.datanucleus.ClassLoaderResolverImpl; @@ -1150,7 +1147,7 @@ public class ObjectStore implements RawStore, Configurable { if (MetaStoreUtils.isMaterializedViewTable(tbl)) { // Add to the invalidation cache MaterializationsInvalidationCache.get().createMaterializedView( - tbl, tbl.getCreationMetadata().keySet()); + tbl, tbl.getCreationMetadata().getTablesUsed()); } } } @@ -1236,6 +1233,14 @@ public class ObjectStore implements RawStore, Configurable { } preDropStorageDescriptor(tbl.getSd()); + + if (tbl.getCreationMetadata() != null) { + // Remove creation metadata + MCreationMetadata mcm = tbl.getCreationMetadata(); + tbl.setCreationMetadata(null); + pm.deletePersistent(mcm); + } + // then remove the table pm.deletePersistentAll(tbl); } @@ -1552,6 +1557,11 @@ public class ObjectStore implements RawStore, Configurable { pm.retrieveAll(mtbl.getSd().getCD()); nmtbl.mcd = mtbl.getSd().getCD(); } + // Retrieve creation metadata if needed + if (mtbl != null && + TableType.MATERIALIZED_VIEW.toString().equals(mtbl.getTableType())) { + mtbl.setCreationMetadata(getCreationMetadata(db, table)); + } commited = commitTransaction(); } finally { rollbackAndCleanup(commited, query); @@ -1560,6 +1570,25 @@ public class ObjectStore implements RawStore, Configurable { return nmtbl; } + private MCreationMetadata getCreationMetadata(String dbName, String tblName) { + boolean commited = false; + MCreationMetadata mcm = null; + Query query = null; + try { + openTransaction(); + query = pm.newQuery( + MCreationMetadata.class, "tblName == table && dbName == db"); + query.declareParameters("java.lang.String table, java.lang.String db"); + query.setUnique(true); + mcm = (MCreationMetadata) query.execute(tblName, dbName); + pm.retrieve(mcm); + commited = commitTransaction(); + } finally { + rollbackAndCleanup(commited, query); + } + return mcm; + } + private MTable getMTable(String db, String table) { AttachedMTableInfo nmtbl = getMTable(db, table, false); return nmtbl.mtbl; @@ -1885,56 +1914,37 @@ public class ObjectStore implements RawStore, Configurable { .getSkewedColValueLocationMaps()), sd.isStoredAsSubDirectories()); } - private Map<String, String> convertToMCreationMetadata( - Map<String, BasicTxnInfo> m) throws MetaException { + private MCreationMetadata convertToMCreationMetadata( + CreationMetadata m) throws MetaException { if (m == null) { return null; } - Map<String, String> r = new HashMap<>(); - for (Entry<String, BasicTxnInfo> e : m.entrySet()) { - r.put(e.getKey(), serializeBasicTransactionInfo(e.getValue())); + Set<MTable> tablesUsed = new HashSet<>(); + for (String fullyQualifiedName : m.getTablesUsed()) { + String[] names = fullyQualifiedName.split("\\."); + tablesUsed.add(getMTable(names[0], names[1], false).mtbl); } - return r; + return new MCreationMetadata(m.getDbName(), m.getTblName(), + tablesUsed, m.getValidTxnList()); } - private Map<String, BasicTxnInfo> convertToCreationMetadata( - Map<String, String> m) throws MetaException { - if (m == null) { + private CreationMetadata convertToCreationMetadata( + MCreationMetadata s) throws MetaException { + if (s == null) { return null; } - Map<String, BasicTxnInfo> r = new HashMap<>(); - for (Entry<String, String> e : m.entrySet()) { - r.put(e.getKey(), deserializeBasicTransactionInfo(e.getValue())); - } - return r; - } - - private String serializeBasicTransactionInfo(BasicTxnInfo entry) - throws MetaException { - if (entry.isIsnull()) { - return ""; - } - try { - TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); - return serializer.toString(entry, "UTF-8"); - } catch (TException e) { - throw new MetaException("Error serializing object " + entry + ": " + e.toString()); - } - } - - private BasicTxnInfo deserializeBasicTransactionInfo(String s) - throws MetaException { - if (s.equals("")) { - return new BasicTxnInfo(true); + Set<String> tablesUsed = new HashSet<>(); + for (MTable mtbl : s.getTables()) { + tablesUsed.add( + Warehouse.getQualifiedName( + mtbl.getDatabase().getName(), mtbl.getTableName())); } - try { - TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory()); - BasicTxnInfo r = new BasicTxnInfo(); - deserializer.deserialize(r, s, "UTF-8"); - return r; - } catch (TException e) { - throw new MetaException("Error deserializing object " + s + ": " + e.toString()); + CreationMetadata r = new CreationMetadata( + s.getDbName(), s.getTblName(), tablesUsed); + if (s.getTxnList() != null) { + r.setValidTxnList(s.getTxnList()); } + return r; } @Override @@ -3703,9 +3713,10 @@ public class ObjectStore implements RawStore, Configurable { oldt.setViewOriginalText(newt.getViewOriginalText()); oldt.setViewExpandedText(newt.getViewExpandedText()); oldt.setRewriteEnabled(newt.isRewriteEnabled()); - registerCreationSignature = !MapUtils.isEmpty(newt.getCreationMetadata()); + registerCreationSignature = newt.getCreationMetadata() != null; if (registerCreationSignature) { - oldt.setCreationMetadata(newt.getCreationMetadata()); + oldt.getCreationMetadata().setTables(newt.getCreationMetadata().getTables()); + oldt.getCreationMetadata().setTxnList(newt.getCreationMetadata().getTxnList()); } // commit the changes @@ -3718,7 +3729,7 @@ public class ObjectStore implements RawStore, Configurable { registerCreationSignature) { // Add to the invalidation cache if the creation signature has changed MaterializationsInvalidationCache.get().alterMaterializedView( - newTable, newTable.getCreationMetadata().keySet()); + newTable, newTable.getCreationMetadata().getTablesUsed()); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/0a328f03/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java new file mode 100644 index 0000000..1133cb1 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.model; + +import java.util.Set; + +/** + * Represents the creation metadata of a materialization. + * It includes the database and table name for the materialization, + * the set of tables that it uses, and the valid transaction list + * when it was created. + */ +public class MCreationMetadata { + + private String dbName; + private String tblName; + private Set<MTable> tables; + private String txnList; + + public MCreationMetadata() { + } + + public MCreationMetadata(String dbName, String tblName, + Set<MTable> tables, String txnList) { + this.dbName = dbName; + this.tblName = tblName; + this.tables = tables; + this.txnList = txnList; + } + + public Set<MTable> getTables() { + return tables; + } + + public void setTables(Set<MTable> tables) { + this.tables = tables; + } + + public String getTxnList() { + return txnList; + } + + public void setTxnList(String txnList) { + this.txnList = txnList; + } + + public String getDbName() { + return dbName; + } + + public void setDbName(String dbName) { + this.dbName = dbName; + } + + public String getTblName() { + return tblName; + } + + public void setTblName(String tblName) { + this.tblName = tblName; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/0a328f03/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java index 6c40ae8..aea16ad 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java @@ -35,7 +35,7 @@ public class MTable { private String viewOriginalText; private String viewExpandedText; private boolean rewriteEnabled; - private Map<String, String> creationMetadata; + private MCreationMetadata creationMetadata; private String tableType; public MTable() {} @@ -57,7 +57,7 @@ public class MTable { public MTable(String tableName, MDatabase database, MStorageDescriptor sd, String owner, int createTime, int lastAccessTime, int retention, List<MFieldSchema> partitionKeys, Map<String, String> parameters, String viewOriginalText, String viewExpandedText, - boolean rewriteEnabled, Map<String, String> creationMetadata, + boolean rewriteEnabled, MCreationMetadata creationMetadata, String tableType) { this.tableName = tableName; this.database = database; @@ -176,14 +176,14 @@ public class MTable { /** * @return the metadata information related to a materialized view creation */ - public Map<String, String> getCreationMetadata() { + public MCreationMetadata getCreationMetadata() { return creationMetadata; } /** * @param creationMetadata the metadata information to set */ - public void setCreationMetadata(Map<String, String> creationMetadata) { + public void setCreationMetadata(MCreationMetadata creationMetadata) { this.creationMetadata = creationMetadata; } http://git-wip-us.apache.org/repos/asf/hive/blob/0a328f03/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 3a558b4..1bb976c 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -27,6 +27,7 @@ import java.sql.SQLFeatureNotSupportedException; import java.sql.Savepoint; import java.sql.Statement; import java.util.ArrayList; +import java.util.Arrays; import java.util.BitSet; import java.util.Calendar; import java.util.Collections; @@ -54,11 +55,13 @@ import org.apache.commons.dbcp.ConnectionFactory; import org.apache.commons.dbcp.DriverManagerConnectionFactory; import org.apache.commons.dbcp.PoolableConnectionFactory; import org.apache.commons.dbcp.PoolingDataSource; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.NotImplementedException; import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.metastore.DatabaseProduct; import org.apache.hadoop.hive.metastore.MaterializationsInvalidationCache; @@ -103,7 +106,6 @@ import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnInfo; import org.apache.hadoop.hive.metastore.api.TxnOpenException; import org.apache.hadoop.hive.metastore.api.TxnState; -import org.apache.hadoop.hive.metastore.api.TxnsSnapshot; import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; @@ -840,13 +842,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { dbConn.commit(); // Update registry with modifications - s = "select ctc_database, ctc_table, ctc_id, ctc_timestamp from COMPLETED_TXN_COMPONENTS where ctc_txnid = " + txnid; + s = "select ctc_database, ctc_table, ctc_timestamp from COMPLETED_TXN_COMPONENTS where ctc_txnid = " + txnid; rs = stmt.executeQuery(s); if (rs.next()) { LOG.debug("Going to register table modification in invalidation cache <" + s + ">"); MaterializationsInvalidationCache.get().notifyTableModification( - rs.getString(1), rs.getString(2), rs.getLong(3), - rs.getTimestamp(4, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime()); + rs.getString(1), rs.getString(2), txnid, + rs.getTimestamp(3, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime()); } close(rs); dbConn.commit(); @@ -913,73 +915,16 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } /** - * Gets the information of the last transaction committed for the input table - * given the transaction snapshot provided. - */ - @Override - @RetrySemantics.ReadOnly - public List<BasicTxnInfo> getLastCompletedTransactionForTables( - List<String> dbNames, List<String> tableNames, TxnsSnapshot txnsSnapshot) throws MetaException { - List<BasicTxnInfo> r = new ArrayList<>(); - for (int i = 0; i < dbNames.size(); i++) { - r.add(getLastCompletedTransactionForTable(dbNames.get(i), tableNames.get(i), txnsSnapshot)); - } - return r; - } - - /** - * Gets the information of the last transaction committed for the input table - * given the transaction snapshot provided. - */ - @Override - @RetrySemantics.ReadOnly - public BasicTxnInfo getLastCompletedTransactionForTable( - String inputDbName, String inputTableName, TxnsSnapshot txnsSnapshot) throws MetaException { - Connection dbConn = null; - Statement stmt = null; - ResultSet rs = null; - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - stmt.setMaxRows(1); - String s = "select ctc_id, ctc_timestamp, ctc_txnid, ctc_database, ctc_table " - + "from COMPLETED_TXN_COMPONENTS " - + "where ctc_database=" + quoteString(inputDbName) + " and ctc_table=" + quoteString(inputTableName) - + " and ctc_txnid <= " + txnsSnapshot.getTxn_high_water_mark() - + (txnsSnapshot.getOpen_txns().isEmpty() ? - " " : " and ctc_txnid NOT IN(" + StringUtils.join(",", txnsSnapshot.getOpen_txns()) + ") ") - + "order by ctc_id desc"; - if (LOG.isDebugEnabled()) { - LOG.debug("Going to execute query <" + s + ">"); - } - rs = stmt.executeQuery(s); - if(!rs.next()) { - return new BasicTxnInfo(true); - } - final BasicTxnInfo txnInfo = new BasicTxnInfo(false); - txnInfo.setId(rs.getLong(1)); - txnInfo.setTime(rs.getTimestamp(2, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime()); - txnInfo.setTxnid(rs.getLong(3)); - txnInfo.setDbname(rs.getString(4)); - txnInfo.setTablename(rs.getString(5)); - return txnInfo; - } catch (SQLException ex) { - LOG.warn("getLastCompletedTransactionForTable failed due to " + getMessage(ex), ex); - throw new MetaException("Unable to retrieve commits information due to " + StringUtils.stringifyException(ex)); - } finally { - close(rs, stmt, dbConn); - } - } - - /** * Gets the information of the first transaction for the given table * after the transaction with the input id was committed (if any). */ @Override @RetrySemantics.ReadOnly public BasicTxnInfo getFirstCompletedTransactionForTableAfterCommit( - String inputDbName, String inputTableName, long incrementalIdentifier) + String inputDbName, String inputTableName, ValidTxnList txnList) throws MetaException { + final List<Long> openTxns = Arrays.asList(ArrayUtils.toObject(txnList.getInvalidTransactions())); + Connection dbConn = null; Statement stmt = null; ResultSet rs = null; @@ -987,23 +932,26 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); stmt.setMaxRows(1); - String s = "select ctc_id, ctc_timestamp, ctc_txnid, ctc_database, ctc_table " + String s = "select ctc_timestamp, ctc_txnid, ctc_database, ctc_table " + "from COMPLETED_TXN_COMPONENTS " + "where ctc_database=" + quoteString(inputDbName) + " and ctc_table=" + quoteString(inputTableName) - + " and ctc_id > " + incrementalIdentifier + " order by ctc_id asc"; + + " and ctc_txnid > " + txnList.getHighWatermark() + + (txnList.getInvalidTransactions().length == 0 ? + " " : " or ctc_txnid IN(" + StringUtils.join(",", openTxns) + ") ") + + "order by ctc_timestamp asc"; if (LOG.isDebugEnabled()) { LOG.debug("Going to execute query <" + s + ">"); } rs = stmt.executeQuery(s); + if(!rs.next()) { return new BasicTxnInfo(true); } final BasicTxnInfo txnInfo = new BasicTxnInfo(false); - txnInfo.setId(rs.getLong(1)); - txnInfo.setTime(rs.getTimestamp(2, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime()); - txnInfo.setTxnid(rs.getLong(3)); - txnInfo.setDbname(rs.getString(4)); - txnInfo.setTablename(rs.getString(5)); + txnInfo.setTime(rs.getTimestamp(1, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime()); + txnInfo.setTxnid(rs.getLong(2)); + txnInfo.setDbname(rs.getString(3)); + txnInfo.setTablename(rs.getString(4)); return txnInfo; } catch (SQLException ex) { LOG.warn("getLastCompletedTransactionForTable failed due to " + getMessage(ex), ex); http://git-wip-us.apache.org/repos/asf/hive/blob/0a328f03/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 42f90cd..3e27034 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.metastore.api.*; @@ -113,33 +114,14 @@ public interface TxnStore extends Configurable { throws NoSuchTxnException, TxnAbortedException, MetaException; /** - * Get the last transaction corresponding to given databases and tables. - * @return - * @throws MetaException - */ - @RetrySemantics.Idempotent - public List<BasicTxnInfo> getLastCompletedTransactionForTables( - List<String> dbNames, List<String> tableNames, TxnsSnapshot txnsSnapshot) - throws MetaException; - - /** - * Get the last transaction corresponding to given database and table. - * @return - * @throws MetaException - */ - @RetrySemantics.Idempotent - public BasicTxnInfo getLastCompletedTransactionForTable( - String inputDbName, String inputTableName, TxnsSnapshot txnsSnapshot) - throws MetaException; - - /** - * Get the first transaction corresponding to given database and table after incremental id. + * Get the first transaction corresponding to given database and table after transactions + * referenced in the transaction snapshot. * @return * @throws MetaException */ @RetrySemantics.Idempotent public BasicTxnInfo getFirstCompletedTransactionForTableAfterCommit( - String inputDbName, String inputTableName, long id) + String inputDbName, String inputTableName, ValidTxnList txnList) throws MetaException; /** http://git-wip-us.apache.org/repos/asf/hive/blob/0a328f03/standalone-metastore/src/main/resources/package.jdo ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/resources/package.jdo b/standalone-metastore/src/main/resources/package.jdo index 3da09a5..f408de5 100644 --- a/standalone-metastore/src/main/resources/package.jdo +++ b/standalone-metastore/src/main/resources/package.jdo @@ -25,8 +25,8 @@ Non-indexed VARCHAR: 4000 bytes (max length on Oracle 9i/10g/11g) --> -<jdo> - <package name="org.apache.hadoop.hive.metastore.model"> +<jdo> + <package name="org.apache.hadoop.hive.metastore.model"> <class name="MDatabase" identity-type="datastore" table="DBS" detachable="true"> <datastore-identity> <column name="DB_ID"/> @@ -185,17 +185,27 @@ <field name="tableType"> <column name="TBL_TYPE" length="128" jdbc-type="VARCHAR"/> </field> - <field name="creationMetadata" table="MV_CREATION_METADATA"> - <map key-type="java.lang.String" value-type="java.lang.String"/> + </class> + + <class name="MCreationMetadata" identity-type="datastore" table="MV_CREATION_METADATA" detachable="true"> + <datastore-identity> + <column name="MV_CREATION_METADATA_ID"/> + </datastore-identity> + <field name="dbName"> + <column name="DB_NAME" length="128" jdbc-type="VARCHAR"/> + </field> + <field name="tblName"> + <column name="TBL_NAME" length="256" jdbc-type="VARCHAR"/> + </field> + <field name="tables" table="MV_TABLES_USED"> + <collection element-type="MTable"/> <join> - <column name="TBL_ID"/> + <column name="MV_CREATION_METADATA_ID"/> </join> - <key> - <column name="TBL_NAME" length="180" jdbc-type="VARCHAR"/> - </key> - <value> - <column name="LAST_TRANSACTION_INFO" jdbc-type="LONGVARCHAR"/> - </value> + <element column="TBL_ID"/> + </field> + <field name="txnList"> + <column name="TXN_LIST" jdbc-type="CLOB" allows-null="true"/> </field> </class> http://git-wip-us.apache.org/repos/asf/hive/blob/0a328f03/standalone-metastore/src/main/thrift/hive_metastore.thrift ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/thrift/hive_metastore.thrift b/standalone-metastore/src/main/thrift/hive_metastore.thrift index 93f3e53..371b975 100644 --- a/standalone-metastore/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/src/main/thrift/hive_metastore.thrift @@ -327,7 +327,7 @@ struct Table { 13: optional PrincipalPrivilegeSet privileges, 14: optional bool temporary=false, 15: optional bool rewriteEnabled, // rewrite enabled or not - 16: optional map<string, BasicTxnInfo> creationMetadata // only for MVs, it stores table name used -> last modification before MV creation + 16: optional CreationMetadata creationMetadata // only for MVs, it stores table names used and txn list at MV creation } struct Partition { @@ -858,17 +858,18 @@ struct AddDynamicPartitions { struct BasicTxnInfo { 1: required bool isnull, - 2: optional i64 id, - 3: optional i64 time, - 4: optional i64 txnid, - 5: optional string dbname, - 6: optional string tablename, - 7: optional string partitionname + 2: optional i64 time, + 3: optional i64 txnid, + 4: optional string dbname, + 5: optional string tablename, + 6: optional string partitionname } -struct TxnsSnapshot { - 1: required i64 txn_high_water_mark, - 2: required list<i64> open_txns +struct CreationMetadata { + 1: required string dbName, + 2: required string tblName, + 3: required set<string> tablesUsed, + 4: optional string validTxnList } struct NotificationEventRequest { @@ -1815,8 +1816,6 @@ service ThriftHiveMetastore extends fb303.FacebookService CompactionResponse compact2(1:CompactionRequest rqst) ShowCompactResponse show_compact(1:ShowCompactRequest rqst) void add_dynamic_partitions(1:AddDynamicPartitions rqst) throws (1:NoSuchTxnException o1, 2:TxnAbortedException o2) - list<BasicTxnInfo> get_last_completed_transaction_for_tables(1:list<string> db_names, 2:list<string> table_names, 3:TxnsSnapshot txns_snapshot) - BasicTxnInfo get_last_completed_transaction_for_table(1:string db_name, 2:string table_name, 3:TxnsSnapshot txns_snapshot) // Notification logging calls NotificationEventResponse get_next_notification(1:NotificationEventRequest rqst) http://git-wip-us.apache.org/repos/asf/hive/blob/0a328f03/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java index b9a8f61..bd61df6 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java @@ -225,7 +225,6 @@ public class TestCachedStore { Table tbl1 = new Table(tblName1, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<>(), tblParams, null, null, TableType.MANAGED_TABLE.toString()); - tbl1.setCreationMetadata(new HashMap<String, BasicTxnInfo>()); cachedStore.createTable(tbl1); tbl1 = cachedStore.getTable(dbName, tblName1); http://git-wip-us.apache.org/repos/asf/hive/blob/0a328f03/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestGetTableMeta.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestGetTableMeta.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestGetTableMeta.java index 71cac2f..dcf1eb7 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestGetTableMeta.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestGetTableMeta.java @@ -23,8 +23,10 @@ import java.util.HashMap; import java.util.List; import java.util.Set; +import com.google.common.collect.ImmutableSet; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.CreationMetadata; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableMeta; @@ -149,7 +151,9 @@ public class TestGetTableMeta { if (type == TableType.MATERIALIZED_VIEW) { - table.setCreationMetadata(new HashMap<>()); + CreationMetadata cm = new CreationMetadata( + dbName, tableName, ImmutableSet.of()); + table.setCreationMetadata(cm); } if (type == TableType.EXTERNAL_TABLE) { http://git-wip-us.apache.org/repos/asf/hive/blob/0a328f03/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestTablesCreateDropAlterTruncate.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestTablesCreateDropAlterTruncate.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestTablesCreateDropAlterTruncate.java index abc400a..00f38ee 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestTablesCreateDropAlterTruncate.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestTablesCreateDropAlterTruncate.java @@ -286,7 +286,7 @@ public class TestTablesCreateDropAlterTruncate { Assert.assertNull("Comparing ViewOriginalText", createdTable.getViewOriginalText()); Assert.assertNull("Comparing ViewExpandedText", createdTable.getViewExpandedText()); Assert.assertEquals("Comparing TableType", "MANAGED_TABLE", createdTable.getTableType()); - Assert.assertTrue("Creation metadata should be empty", createdTable.getCreationMetadata().isEmpty()); + Assert.assertTrue("Creation metadata should be empty", createdTable.getCreationMetadata() == null); // Storage Descriptor data StorageDescriptor createdSd = createdTable.getSd();
