HIVE-13213 make DbLockManger work for non-acid resources (Eugene Koifman, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bfc24963 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bfc24963 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bfc24963 Branch: refs/heads/branch-1 Commit: bfc249632378c1b9c12c059c817b2c6227c7e0e7 Parents: 0780218 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Tue May 3 14:52:50 2016 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Tue May 3 14:52:50 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/metastore/txn/TxnHandler.java | 5 ++ .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 12 +++ .../apache/hadoop/hive/ql/TestTxnCommands2.java | 22 ++++++ .../hive/ql/lockmgr/TestDbTxnManager2.java | 81 ++++++++++++++++++++ 4 files changed, 120 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/bfc24963/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index ffd450a..f7ef88e 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -89,6 +89,11 @@ import java.util.regex.Pattern; * If we ever decide to run remote Derby server, according to * https://db.apache.org/derby/docs/10.0/manuals/develop/develop78.html all transactions will be * seriazlied, so that would also work though has not been tested. + * + * General design note: + * It's imperative that any operation on a txn (e.g. commit), ensure (atomically) that this txn is + * still valid and active. In the code this is usually achieved at the same time the txn record + * is locked for some operation. */ @InterfaceAudience.Private @InterfaceStability.Evolving http://git-wip-us.apache.org/repos/asf/hive/blob/bfc24963/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index a9867ef..28ee8a8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hive.common.util.ShutdownHookManager; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ValidTxnList; @@ -213,6 +214,17 @@ public class DbTxnManager extends HiveTxnManagerImpl { break; case INSERT: + t = output.getTable(); + if(t == null) { + throw new IllegalStateException("No table info for " + output); + } + if(AcidUtils.isAcidTable(t)) { + compBuilder.setShared(); + } + else { + compBuilder.setExclusive(); + } + break; case DDL_SHARED: compBuilder.setShared(); break; http://git-wip-us.apache.org/repos/asf/hive/blob/bfc24963/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 1bddecb..a901074 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -433,6 +433,28 @@ public class TestTxnCommands2 { } /** + * Test update that hits multiple partitions (i.e. requries dynamic partition insert to process) + * @throws Exception + */ + @Test + public void updateDeletePartitioned() throws Exception { + int[][] tableData = {{1,2},{3,4},{5,6}}; + runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=1) (a,b) " + makeValuesClause(tableData)); + runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=2) (a,b) " + makeValuesClause(tableData)); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + txnHandler.compact(new CompactionRequest("default", Table.ACIDTBLPART.name(), CompactionType.MAJOR)); + runWorker(hiveConf); + runCleaner(hiveConf); + runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = b + 1 where a = 3"); + txnHandler.compact(new CompactionRequest("default", Table.ACIDTBLPART.toString(), CompactionType.MAJOR)); + runWorker(hiveConf); + runCleaner(hiveConf); + List<String> rs = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b"); + int[][] expectedData = {{1,1,2},{1,3,5},{1,5,6},{2,1,2},{2,3,5},{2,5,6}}; + Assert.assertEquals("Update " + Table.ACIDTBLPART + " didn't match:", stringifyValues(expectedData), rs); + } + + /** * https://issues.apache.org/jira/browse/HIVE-10151 */ @Test http://git-wip-us.apache.org/repos/asf/hive/blob/bfc24963/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 42c7064..0e2bfc0 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -524,6 +524,87 @@ public class TestDbTxnManager2 { Assert.assertEquals(0, count); } + /** + * collection of queries where we ensure that we get the locks that are expected + * @throws Exception + */ + @Test + public void checkExpectedLocks() throws Exception { + CommandProcessorResponse cpr = null; + cpr = driver.run("create table acidPart(a int, b int) partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table nonAcidPart(a int, b int) partitioned by (p string) stored as orc"); + checkCmdOnDriver(cpr); + + cpr = driver.compileAndRespond("insert into nonAcidPart partition(p) values(1,2,3)"); + checkCmdOnDriver(cpr); + LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); + List<ShowLocksResponseElement> locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "values__tmp__table__1", null, locks.get(0)); + checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "nonAcidPart", null, locks.get(1)); + List<HiveLock> relLocks = new ArrayList<HiveLock>(2); + relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); + relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid())); + txnMgr.getLockManager().releaseLocks(relLocks); + + cpr = driver.compileAndRespond("insert into nonAcidPart partition(p=1) values(5,6)"); + checkCmdOnDriver(cpr); + lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); + locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "values__tmp__table__2", null, locks.get(0)); + checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "nonAcidPart", "p=1", locks.get(1)); + relLocks = new ArrayList<HiveLock>(2); + relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); + relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid())); + txnMgr.getLockManager().releaseLocks(relLocks); + + cpr = driver.compileAndRespond("insert into acidPart partition(p) values(1,2,3)"); + checkCmdOnDriver(cpr); + lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); + locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "values__tmp__table__3", null, locks.get(0)); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "acidPart", null, locks.get(1)); + relLocks = new ArrayList<HiveLock>(2); + relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); + relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid())); + txnMgr.getLockManager().releaseLocks(relLocks); + + cpr = driver.compileAndRespond("insert into acidPart partition(p=1) values(5,6)"); + checkCmdOnDriver(cpr); + lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); + locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "values__tmp__table__4", null, locks.get(0)); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "acidPart", "p=1", locks.get(1)); + relLocks = new ArrayList<HiveLock>(2); + relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); + relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid())); + txnMgr.getLockManager().releaseLocks(relLocks); + + cpr = driver.compileAndRespond("update acidPart set b = 17 where a = 1"); + checkCmdOnDriver(cpr); + lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); + locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "acidPart", null, locks.get(0)); + relLocks = new ArrayList<HiveLock>(2); + relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); + txnMgr.getLockManager().releaseLocks(relLocks); + + cpr = driver.compileAndRespond("update acidPart set b = 17 where p = 1"); + checkCmdOnDriver(cpr); + lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false); + locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "acidPart", null, locks.get(0));//https://issues.apache.org/jira/browse/HIVE-13212 + relLocks = new ArrayList<HiveLock>(2); + relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); + txnMgr.getLockManager().releaseLocks(relLocks); + } + private void checkLock(LockType type, LockState state, String db, String table, String partition, ShowLocksResponseElement l) { Assert.assertEquals(l.toString(),l.getType(), type); Assert.assertEquals(l.toString(),l.getState(), state);