Repository: hive Updated Branches: refs/heads/master dcd9b5941 -> be420098f
http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index a79242b..9256b7a 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -63,9 +63,11 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.metastore.DatabaseProduct; import org.apache.hadoop.hive.metastore.MaterializationsInvalidationCache; +import org.apache.hadoop.hive.metastore.MaterializationsRebuildLockHandler; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier; import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener; @@ -854,9 +856,18 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { @Override @RetrySemantics.Idempotent("No-op if already committed") public void commitTxn(CommitTxnRequest rqst) - throws NoSuchTxnException, TxnAbortedException, MetaException { + throws NoSuchTxnException, TxnAbortedException, MetaException { + MaterializationsRebuildLockHandler materializationsRebuildLockHandler = + MaterializationsRebuildLockHandler.get(); + String fullyQualifiedName = null; + String dbName = null; + String tblName = null; + long writeId = 0L; + long timestamp = 0L; + boolean isUpdateDelete = false; long txnid = rqst.getTxnid(); long sourceTxnId = -1; + try { Connection dbConn = null; Statement stmt = null; @@ -905,6 +916,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")"; rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, "tc_operation_type " + conflictSQLSuffix)); if (rs.next()) { + isUpdateDelete = true; close(rs); //if here it means currently committing txn performed update/delete and we should check WW conflict /** @@ -1007,6 +1019,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { LOG.info("Expected to move at least one record from txn_components to " + "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid)); } + // Obtain information that we need to update registry + s = "select ctc_database, ctc_table, ctc_writeid, ctc_timestamp from COMPLETED_TXN_COMPONENTS where ctc_txnid = " + txnid; + LOG.debug("Going to extract table modification information for invalidation cache <" + s + ">"); + rs = stmt.executeQuery(s); + if (rs.next()) { + dbName = rs.getString(1); + tblName = rs.getString(2); + fullyQualifiedName = Warehouse.getQualifiedName(dbName, tblName); + writeId = rs.getLong(3); + timestamp = rs.getTimestamp(4, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime(); + } s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid; LOG.debug("Going to execute update <" + s + ">"); modCount = stmt.executeUpdate(s); @@ -1021,16 +1044,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { modCount = stmt.executeUpdate(s); LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL"); - // Update registry with modifications - s = "select ctc_database, ctc_table, ctc_timestamp from COMPLETED_TXN_COMPONENTS where ctc_txnid = " + txnid; - rs = stmt.executeQuery(s); - if (rs.next()) { - LOG.debug("Going to register table modification in invalidation cache <" + s + ">"); - MaterializationsInvalidationCache.get().notifyTableModification( - rs.getString(1), rs.getString(2), txnid, - rs.getTimestamp(3, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime()); - } - if (rqst.isSetReplPolicy()) { s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId + " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy()); @@ -1043,10 +1056,20 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, dbConn, sqlGenerator)); } - close(rs); - + MaterializationsInvalidationCache materializationsInvalidationCache = + MaterializationsInvalidationCache.get(); + if (materializationsInvalidationCache.containsMaterialization(dbName, tblName) && + !materializationsRebuildLockHandler.readyToCommitResource(dbName, tblName, txnid)) { + throw new MetaException( + "Another process is rebuilding the materialized view " + fullyQualifiedName); + } LOG.debug("Going to commit"); + close(rs); dbConn.commit(); + + // Update registry with modifications + materializationsInvalidationCache.notifyTableModification( + dbName, tblName, writeId, timestamp, isUpdateDelete); } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); @@ -1057,6 +1080,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { close(commitIdRs); close(lockHandle, stmt, dbConn); unlockInternal(); + if (fullyQualifiedName != null) { + materializationsRebuildLockHandler.unlockResource(dbName, tblName, txnid); + } } } catch (RetryException e) { commitTxn(rqst); @@ -1384,9 +1410,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { @Override @RetrySemantics.ReadOnly public BasicTxnInfo getFirstCompletedTransactionForTableAfterCommit( - String inputDbName, String inputTableName, ValidTxnList txnList) + String inputDbName, String inputTableName, ValidWriteIdList txnList) throws MetaException { - final List<Long> openTxns = Arrays.asList(ArrayUtils.toObject(txnList.getInvalidTransactions())); + final List<Long> openTxns = Arrays.asList(ArrayUtils.toObject(txnList.getInvalidWriteIds())); Connection dbConn = null; Statement stmt = null; @@ -1395,12 +1421,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); stmt.setMaxRows(1); - String s = "select ctc_timestamp, ctc_txnid, ctc_database, ctc_table " + String s = "select ctc_timestamp, ctc_writeid, ctc_database, ctc_table " + "from COMPLETED_TXN_COMPONENTS " + "where ctc_database=" + quoteString(inputDbName) + " and ctc_table=" + quoteString(inputTableName) - + " and ctc_txnid > " + txnList.getHighWatermark() - + (txnList.getInvalidTransactions().length == 0 ? - " " : " or ctc_txnid IN(" + StringUtils.join(",", openTxns) + ") ") + + " and ctc_writeid > " + txnList.getHighWatermark() + + (txnList.getInvalidWriteIds().length == 0 ? + " " : " or ctc_writeid IN(" + StringUtils.join(",", openTxns) + ") ") + "order by ctc_timestamp asc"; if (LOG.isDebugEnabled()) { LOG.debug("Going to execute query <" + s + ">"); http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index e72d327..6d8b845 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.metastore.api.*; @@ -123,7 +124,7 @@ public interface TxnStore extends Configurable { */ @RetrySemantics.Idempotent public BasicTxnInfo getFirstCompletedTransactionForTableAfterCommit( - String inputDbName, String inputTableName, ValidTxnList txnList) + String inputDbName, String inputTableName, ValidWriteIdList txnList) throws MetaException; /** * Gets the list of valid write ids for the given table wrt to current txn http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/thrift/hive_metastore.thrift ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/thrift/hive_metastore.thrift b/standalone-metastore/src/main/thrift/hive_metastore.thrift index 7450439..612afe1 100644 --- a/standalone-metastore/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/src/main/thrift/hive_metastore.thrift @@ -1227,7 +1227,8 @@ struct TableMeta { struct Materialization { 1: required set<string> tablesUsed; 2: optional string validTxnList - 3: required i64 invalidationTime; + 3: optional i64 invalidationTime; + 4: optional bool sourceTablesUpdateDeleteModified; } // Data types for workload management. @@ -2163,6 +2164,8 @@ service ThriftHiveMetastore extends fb303.FacebookService void add_serde(1: SerDeInfo serde) throws(1:AlreadyExistsException o1, 2:MetaException o2) SerDeInfo get_serde(1: GetSerdeRequest rqst) throws(1:NoSuchObjectException o1, 2:MetaException o2) + LockResponse get_lock_materialization_rebuild(1: string dbName, 2: string tableName, 3: i64 txnId) + bool heartbeat_lock_materialization_rebuild(1: string dbName, 2: string tableName, 3: i64 txnId) } // * Note about the DDL_TIME: When creating or altering a table or a partition, http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java index 7d37262..ecddc7b 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java @@ -3321,4 +3321,16 @@ public class HiveMetaStoreClientPreCatalog implements IMetaStoreClient, AutoClos public SerDeInfo getSerDe(String serDeName) throws TException { throw new UnsupportedOperationException(); } + + /** {@inheritDoc} */ + @Override + public LockResponse lockMaterializationRebuild(String dbName, String tableName, long txnId) throws TException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override + public boolean heartbeatLockMaterializationRebuild(String dbName, String tableName, long txnId) throws TException { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMaterializationsCacheCleaner.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMaterializationsCacheCleaner.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMaterializationsCacheCleaner.java index 7a871e1..8debcce 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMaterializationsCacheCleaner.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMaterializationsCacheCleaner.java @@ -65,16 +65,16 @@ public class TestMetaStoreMaterializationsCacheCleaner { // This is a dummy test, invalidation cache is not supposed to // record any information. MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_1, 1, 1); + DB_NAME, TBL_NAME_1, 1, 1, false); int id = 2; BasicTxnInfo txn2 = createTxnInfo(DB_NAME, TBL_NAME_1, id); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_1, id, id); + DB_NAME, TBL_NAME_1, id, id, false); // Create tbl2 (nothing to do) id = 3; BasicTxnInfo txn3 = createTxnInfo(DB_NAME, TBL_NAME_1, id); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_2, id, id); + DB_NAME, TBL_NAME_2, id, id, false); // Cleanup (current = 4, duration = 4) -> Does nothing long removed = MaterializationsInvalidationCache.get().cleanup(0L); Assert.assertEquals(0L, removed); @@ -92,6 +92,7 @@ public class TestMetaStoreMaterializationsCacheCleaner { when(mv1.getCreationMetadata()).thenReturn(mockCM1); MaterializationsInvalidationCache.get().createMaterializedView(mockCM1.getDbName(), mockCM1.getTblName(), mockCM1.getTablesUsed(), mockCM1.getValidTxnList()); + // Format <txnId>$<table_name>:<hwm>:<minOpenWriteId>:<open_writeids>:<abort_writeids>$<table_name> Map<String, Materialization> invalidationInfos = MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo( DB_NAME, ImmutableList.of(MV_NAME_1)); @@ -99,11 +100,11 @@ public class TestMetaStoreMaterializationsCacheCleaner { id = 10; BasicTxnInfo txn10 = createTxnInfo(DB_NAME, TBL_NAME_2, id); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_2, id, id); + DB_NAME, TBL_NAME_2, id, id, false); id = 9; BasicTxnInfo txn9 = createTxnInfo(DB_NAME, TBL_NAME_1, id); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_1, id, id); + DB_NAME, TBL_NAME_1, id, id, false); // Cleanup (current = 12, duration = 4) -> Removes txn1, txn2, txn3 removed = MaterializationsInvalidationCache.get().cleanup(8L); Assert.assertEquals(0L, removed); @@ -132,15 +133,15 @@ public class TestMetaStoreMaterializationsCacheCleaner { Assert.assertTrue(invalidationInfos.isEmpty()); // Create tbl3 (nothing to do) MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_3, 11, 11); + DB_NAME, TBL_NAME_3, 11, 11, false); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_3, 18, 18); + DB_NAME, TBL_NAME_3, 18, 18, false); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_1, 14, 14); + DB_NAME, TBL_NAME_1, 14, 14, false); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_1, 17, 17); + DB_NAME, TBL_NAME_1, 17, 17, false); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_2, 16, 16); + DB_NAME, TBL_NAME_2, 16, 16, false); // Cleanup (current = 20, duration = 4) -> Removes txn10, txn11 removed = MaterializationsInvalidationCache.get().cleanup(16L); Assert.assertEquals(0L, removed); @@ -149,11 +150,11 @@ public class TestMetaStoreMaterializationsCacheCleaner { DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2)); Assert.assertTrue(invalidationInfos.isEmpty()); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_1, 12, 12); + DB_NAME, TBL_NAME_1, 12, 12, false); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_2, 15, 15); + DB_NAME, TBL_NAME_2, 15, 15, false); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_2, 7, 7); + DB_NAME, TBL_NAME_2, 7, 7, false); invalidationInfos = MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo( DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2)); @@ -205,16 +206,16 @@ public class TestMetaStoreMaterializationsCacheCleaner { // Cleanup (current = 24, duration = 4) -> Removes txn9, txn14, txn15, txn16, txn17, txn18 // Create tbl1 (nothing to do) MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_1, 1, 1); + DB_NAME, TBL_NAME_1, 1, 1, false); int id = 2; BasicTxnInfo txn2 = createTxnInfo(DB_NAME, TBL_NAME_1, id); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_1, id, id); + DB_NAME, TBL_NAME_1, id, id, false); // Create tbl2 (nothing to do) id = 3; BasicTxnInfo txn3 = createTxnInfo(DB_NAME, TBL_NAME_1, id); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_2, id, id); + DB_NAME, TBL_NAME_2, id, id, false); // Cleanup (current = 4, duration = 4) -> Does nothing long removed = MaterializationsInvalidationCache.get().cleanup(0L); Assert.assertEquals(0L, removed); @@ -228,7 +229,8 @@ public class TestMetaStoreMaterializationsCacheCleaner { DB_NAME + "." + TBL_NAME_1, DB_NAME + "." + TBL_NAME_2)); // Create txn list (highWatermark=4;minOpenTxn=Long.MAX_VALUE) - mockCM1.setValidTxnList("3:" + Long.MAX_VALUE + "::"); + mockCM1.setValidTxnList("3$" + DB_NAME + "." + TBL_NAME_1 + ":3:" + Long.MAX_VALUE + "::" + + "$" + DB_NAME + "." + TBL_NAME_2 + ":3:" + Long.MAX_VALUE + "::"); when(mv1.getCreationMetadata()).thenReturn(mockCM1); MaterializationsInvalidationCache.get().createMaterializedView(mockCM1.getDbName(), mockCM1.getTblName(), mockCM1.getTablesUsed(), mockCM1.getValidTxnList()); @@ -239,11 +241,11 @@ public class TestMetaStoreMaterializationsCacheCleaner { id = 10; BasicTxnInfo txn10 = createTxnInfo(DB_NAME, TBL_NAME_2, id); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_2, id, id); + DB_NAME, TBL_NAME_2, id, id, false); id = 9; BasicTxnInfo txn9 = createTxnInfo(DB_NAME, TBL_NAME_1, id); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_1, id, id); + DB_NAME, TBL_NAME_1, id, id, false); // Cleanup (current = 12, duration = 4) -> Removes txn1, txn2, txn3 removed = MaterializationsInvalidationCache.get().cleanup(8L); Assert.assertEquals(3L, removed); @@ -261,7 +263,8 @@ public class TestMetaStoreMaterializationsCacheCleaner { DB_NAME + "." + TBL_NAME_1, DB_NAME + "." + TBL_NAME_2)); // Create txn list (highWatermark=10;minOpenTxn=Long.MAX_VALUE) - mockCM2.setValidTxnList("10:" + Long.MAX_VALUE + "::"); + mockCM2.setValidTxnList("10$" + DB_NAME + "." + TBL_NAME_1 + ":10:" + Long.MAX_VALUE + "::" + + "$" + DB_NAME + "." + TBL_NAME_2 + ":10:" + Long.MAX_VALUE + "::"); when(mv2.getCreationMetadata()).thenReturn(mockCM2); MaterializationsInvalidationCache.get().createMaterializedView(mockCM2.getDbName(), mockCM2.getTblName(), mockCM2.getTablesUsed(), mockCM2.getValidTxnList()); @@ -273,15 +276,15 @@ public class TestMetaStoreMaterializationsCacheCleaner { Assert.assertEquals(0L, invalidationInfos.get(MV_NAME_2).getInvalidationTime()); // Create tbl3 (nothing to do) MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_3, 11, 11); + DB_NAME, TBL_NAME_3, 11, 11, false); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_3, 18, 18); + DB_NAME, TBL_NAME_3, 18, 18, false); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_1, 14, 14); + DB_NAME, TBL_NAME_1, 14, 14, false); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_1, 17, 17); + DB_NAME, TBL_NAME_1, 17, 17, false); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_2, 16, 16); + DB_NAME, TBL_NAME_2, 16, 16, false); // Cleanup (current = 20, duration = 4) -> Removes txn10, txn11 removed = MaterializationsInvalidationCache.get().cleanup(16L); Assert.assertEquals(2L, removed); @@ -291,11 +294,11 @@ public class TestMetaStoreMaterializationsCacheCleaner { Assert.assertEquals(9L, invalidationInfos.get(MV_NAME_1).getInvalidationTime()); Assert.assertEquals(14L, invalidationInfos.get(MV_NAME_2).getInvalidationTime()); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_1, 12, 12); + DB_NAME, TBL_NAME_1, 12, 12, false); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_2, 15, 15); + DB_NAME, TBL_NAME_2, 15, 15, false); MaterializationsInvalidationCache.get().notifyTableModification( - DB_NAME, TBL_NAME_2, 7, 7); + DB_NAME, TBL_NAME_2, 7, 7, false); invalidationInfos = MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo( DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2));