Author: hashutosh Date: Sat Jan 19 05:41:11 2013 New Revision: 1435492 URL: http://svn.apache.org/viewvc?rev=1435492&view=rev Log: HIVE-3537 : release locks at the end of move tasks (Namit via Ashutosh Chauhan)
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=1435492&r1=1435491&r2=1435492&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java Sat Jan 19 05:41:11 2013 @@ -42,8 +42,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.util.StringUtils; @@ -88,6 +91,12 @@ public class Context { private boolean needLockMgr; + // Keep track of the mapping from load table desc to the output and the lock + private final Map<LoadTableDesc, WriteEntity> loadTableOutputMap = + new HashMap<LoadTableDesc, WriteEntity>(); + private final Map<WriteEntity, List<HiveLockObj>> outputLockObjects = + new HashMap<WriteEntity, List<HiveLockObj>>(); + public Context(Configuration conf) throws IOException { this(conf, generateExecutionId()); } @@ -109,6 +118,15 @@ public class Context { executionId).toUri().getPath(); } + + public Map<LoadTableDesc, WriteEntity> getLoadTableOutputMap() { + return loadTableOutputMap; + } + + public Map<WriteEntity, List<HiveLockObj>> getOutputLockObjects() { + return outputLockObjects; + } + /** * Set the context on whether the current query is an explain query. * @param value true if the query is an explain query, false if not Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1435492&r1=1435491&r2=1435492&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Sat Jan 19 05:41:11 2013 @@ -785,15 +785,21 @@ public class Driver implements CommandPr } for (WriteEntity output : plan.getOutputs()) { + List<HiveLockObj> lockObj = null; if (output.getTyp() == WriteEntity.Type.TABLE) { - lockObjects.addAll(getLockObjects(output.getTable(), null, - output.isComplete() ? HiveLockMode.EXCLUSIVE : HiveLockMode.SHARED)); + lockObj = getLockObjects(output.getTable(), null, + output.isComplete() ? HiveLockMode.EXCLUSIVE : HiveLockMode.SHARED); } else if (output.getTyp() == WriteEntity.Type.PARTITION) { - lockObjects.addAll(getLockObjects(null, output.getPartition(), HiveLockMode.EXCLUSIVE)); + lockObj = getLockObjects(null, output.getPartition(), HiveLockMode.EXCLUSIVE); } // In case of dynamic queries, it is possible to have incomplete dummy partitions else if (output.getTyp() == WriteEntity.Type.DUMMYPARTITION) { - lockObjects.addAll(getLockObjects(null, output.getPartition(), HiveLockMode.SHARED)); + lockObj = getLockObjects(null, output.getPartition(), HiveLockMode.SHARED); + } + + if(lockObj != null) { + lockObjects.addAll(lockObj); + ctx.getOutputLockObjects().put(output, lockObj); } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1435492&r1=1435491&r2=1435492&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Sat Jan 19 05:41:11 2013 @@ -40,6 +40,9 @@ import org.apache.hadoop.hive.ql.DriverC import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; @@ -146,6 +149,41 @@ public class MoveTask extends Task<MoveW return deletePath; } + // Release all the locks acquired for this object + // This becomes important for multi-table inserts when one branch may take much more + // time than the others. It is better to release the lock for this particular insert. + // The other option is to wait for all the branches to finish, or set + // hive.multi.insert.move.tasks.share.dependencies to true, which will mean that the + // first multi-insert results will be available when all of the branches of multi-table + // inserts are done. + private void releaseLocks(LoadTableDesc ltd) throws HiveException { + // nothing needs to be done + if (!conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY)) { + return; + } + + Context ctx = driverContext.getCtx(); + HiveLockManager lockMgr = ctx.getHiveLockMgr(); + WriteEntity output = ctx.getLoadTableOutputMap().get(ltd); + List<HiveLockObj> lockObjects = ctx.getOutputLockObjects().get(output); + if (lockObjects == null) { + return; + } + + for (HiveLockObj lockObj : lockObjects) { + List<HiveLock> locks = lockMgr.getLocks(lockObj.getObj(), false, true); + for (HiveLock lock : locks) { + if (lock.getHiveLockMode() == lockObj.getMode()) { + LOG.info("about to release lock for output: " + output.toString() + + " lock: " + lock.getHiveLockObject().getName()); + lockMgr.unlock(lock); + ctx.getHiveLocks().remove(lock); + } + } + } + } + + @Override public int execute(DriverContext driverContext) { @@ -317,6 +355,7 @@ public class MoveTask extends Task<MoveW SessionState.get().getLineageState().setLineage(tbd.getSourceDir(), dc, table.getCols()); } + releaseLocks(tbd); } return 0; Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java?rev=1435492&r1=1435491&r2=1435492&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java Sat Jan 19 05:41:11 2013 @@ -438,10 +438,12 @@ public class EmbeddedLockManager impleme this.lockMode = lockMode; } + @Override public HiveLockObject getHiveLockObject() { return lockObj; } + @Override public HiveLockMode getHiveLockMode() { return lockMode; } @@ -450,5 +452,16 @@ public class EmbeddedLockManager impleme public String toString() { return lockMode + "=" + lockObj.getDisplayName() + "(" + lockObj.getData() + ")"; } + + @Override + public boolean equals(Object o) { + if (!(o instanceof SimpleHiveLock)) { + return false; + } + + SimpleHiveLock simpleLock = (SimpleHiveLock) o; + return lockObj.equals(simpleLock.getHiveLockObject()) && + lockMode == simpleLock.getHiveLockMode(); + } } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java?rev=1435492&r1=1435491&r2=1435492&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java Sat Jan 19 05:41:11 2013 @@ -18,27 +18,27 @@ package org.apache.hadoop.hive.ql.lockmgr; -import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.DummyPartition; +import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; public class HiveLockObject { - String [] pathNames = null; + String[] pathNames = null; public static class HiveLockObjectData { - private String queryId; // queryId of the command + private String queryId; // queryId of the command private String lockTime; // time at which lock was acquired // mode of the lock: EXPLICIT(lock command)/IMPLICIT(query) private String lockMode; private String queryStr; - private String clientIp; + private String clientIp; public HiveLockObjectData(String queryId, - String lockTime, - String lockMode, - String queryStr) { - this.queryId = queryId; + String lockTime, + String lockMode, + String queryStr) { + this.queryId = queryId; this.lockTime = lockTime; this.lockMode = lockMode; this.queryStr = queryStr.trim(); @@ -51,7 +51,7 @@ public class HiveLockObject { } String[] elem = data.split(":"); - queryId = elem[0]; + queryId = elem[0]; lockTime = elem[1]; lockMode = elem[2]; queryStr = elem[3]; @@ -73,18 +73,40 @@ public class HiveLockObject { return queryStr; } + @Override public String toString() { return queryId + ":" + lockTime + ":" + lockMode + ":" + queryStr + ":" + clientIp; } - + public String getClientIp() { return this.clientIp; } - + public void setClientIp(String clientIp) { this.clientIp = clientIp; } + + @Override + public boolean equals(Object o) { + if (!(o instanceof HiveLockObjectData)) { + return false; + } + + HiveLockObjectData target = (HiveLockObjectData) o; + boolean ret = (queryId == null) ? target.getQueryId() == null : + queryId.equals(target.getQueryId()); + ret = ret && (lockTime == null) ? target.getLockTime() == null : + queryId.equals(target.getLockTime()); + ret = ret && (lockMode == null) ? target.getLockMode() == null : + queryId.equals(target.getLockMode()); + ret = ret && (queryStr == null) ? target.getQueryStr() == null : + queryStr.equals(target.getQueryStr()); + ret = ret && (clientIp == null) ? target.getClientIp() == null : + clientIp.equals(target.getClientIp()); + + return ret; + } } /* user supplied data for that object */ @@ -110,12 +132,12 @@ public class HiveLockObject { } public HiveLockObject(Partition par, HiveLockObjectData lockData) { - this(new String[] { par.getTable().getDbName(), - par.getTable().getTableName(), par.getName() }, lockData); + this(new String[] {par.getTable().getDbName(), + par.getTable().getTableName(), par.getName()}, lockData); } public HiveLockObject(DummyPartition par, HiveLockObjectData lockData) { - this(new String[] { par.getName() }, lockData); + this(new String[] {par.getName()}, lockData); } public String[] getPaths() { @@ -171,4 +193,14 @@ public class HiveLockObject { this.data = data; } + @Override + public boolean equals(Object o) { + if (!(o instanceof HiveLockObject)) { + return false; + } + + HiveLockObject tgt = (HiveLockObject) o; + return getName().equals(tgt.getName()) && + data == null ? tgt.getData() == null : data.equals(tgt.getData()); + } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java?rev=1435492&r1=1435491&r2=1435492&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java Sat Jan 19 05:41:11 2013 @@ -41,6 +41,7 @@ public class ZooKeeperHiveLock extends H this.path = path; } + @Override public HiveLockObject getHiveLockObject() { return obj; } @@ -49,6 +50,7 @@ public class ZooKeeperHiveLock extends H this.obj = obj; } + @Override public HiveLockMode getHiveLockMode() { return mode; } @@ -56,4 +58,17 @@ public class ZooKeeperHiveLock extends H public void setHiveLockMode(HiveLockMode mode) { this.mode = mode; } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ZooKeeperHiveLock)) { + return false; + } + + ZooKeeperHiveLock zLock = (ZooKeeperHiveLock)o; + + return path.equals(zLock.getPath()) && + obj.equals(zLock.getHiveLockObject()) && + mode == zLock.getHiveLockMode(); + } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java?rev=1435492&r1=1435491&r2=1435492&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java Sat Jan 19 05:41:11 2013 @@ -18,43 +18,42 @@ package org.apache.hadoop.hive.ql.lockmgr.zookeeper; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs.Ids; import java.io.IOException; import java.net.InetAddress; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import java.util.List; import java.util.ArrayList; -import java.util.Queue; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import java.util.LinkedList; +import java.util.List; import java.util.Map; -import java.util.HashMap; -import java.util.Comparator; -import java.util.Collections; -import java.util.regex.Pattern; +import java.util.Queue; import java.util.regex.Matcher; -import org.apache.zookeeper.KeeperException; +import java.util.regex.Pattern; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx; -import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; import org.apache.hadoop.hive.ql.lockmgr.LockException; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.DummyPartition; -import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; public class ZooKeeperHiveLockManager implements HiveLockManager { HiveLockManagerCtx ctx; @@ -73,7 +72,16 @@ public class ZooKeeperHiveLockManager im private int numRetriesForLock; private int numRetriesForUnLock; - private String clientIp; + private static String clientIp; + + static { + clientIp = "UNKNOWN"; + try { + InetAddress clientAddr = InetAddress.getLocalHost(); + clientIp = clientAddr.getHostAddress(); + } catch (Exception e1) { + } + } public ZooKeeperHiveLockManager() { } @@ -102,12 +110,6 @@ public class ZooKeeperHiveLockManager im sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000; numRetriesForLock = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES); numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES); - clientIp = "UNKNOWN"; - try { - InetAddress clientAddr = InetAddress.getLocalHost(); - clientIp = clientAddr.getHostAddress(); - } catch (Exception e1) { - } try { renewZookeeperInstance(sessionTimeout, quorumServers); @@ -248,6 +250,7 @@ public class ZooKeeperHiveLockManager im for (int pos = len-1; pos >= 0; pos--) { HiveLock hiveLock = hiveLocks.get(pos); try { + LOG.info(" about to release lock for " + hiveLock.getHiveLockObject().getName()); unlock(hiveLock); } catch (LockException e) { // The lock may have been released. Ignore and continue @@ -567,6 +570,7 @@ public class ZooKeeperHiveLockManager im if (fetchData) { try { data = new HiveLockObjectData(new String(zkpClient.getData(curChild, new DummyWatcher(), null))); + data.setClientIp(clientIp); } catch (Exception e) { LOG.error("Error in getting data for " + curChild + " " + e); // ignore error @@ -596,8 +600,9 @@ public class ZooKeeperHiveLockManager im private void checkRedundantNode(String node) { try { // Nothing to do if it is a lock mode - if (getLockMode(ctx.getConf(), node) != null) + if (getLockMode(ctx.getConf(), node) != null) { return; + } List<String> children = zooKeeper.getChildren(node, false); for (String child : children) { @@ -713,8 +718,9 @@ public class ZooKeeperHiveLockManager im Matcher shMatcher = shMode.matcher(path); Matcher exMatcher = exMode.matcher(path); - if (shMatcher.matches()) + if (shMatcher.matches()) { return HiveLockMode.SHARED; + } if (exMatcher.matches()) { return HiveLockMode.EXCLUSIVE; Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1435492&r1=1435491&r2=1435492&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Sat Jan 19 05:41:11 2013 @@ -4846,18 +4846,23 @@ public class SemanticAnalyzer extends Ba loadTableWork.add(ltd); } + WriteEntity output = null; + // Here only register the whole table for post-exec hook if no DP present // in the case of DP, we will register WriteEntity in MoveTask when the // list of dynamically created partitions are known. - if ((dpCtx == null || dpCtx.getNumDPCols() == 0) && - !outputs.add(new WriteEntity(dest_tab))) { - throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES - .getMsg(dest_tab.getTableName())); + if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) { + output = new WriteEntity(dest_tab); + if (!outputs.add(output)) { + throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES + .getMsg(dest_tab.getTableName())); + } } if ((dpCtx != null) && (dpCtx.getNumDPCols() >= 0)) { // No static partition specified if (dpCtx.getNumSPCols() == 0) { - outputs.add(new WriteEntity(dest_tab, false)); + output = new WriteEntity(dest_tab, false); + outputs.add(output); } // part of the partition specified // Create a DummyPartition in this case. Since, the metastore does not store partial @@ -4870,13 +4875,15 @@ public class SemanticAnalyzer extends Ba new DummyPartition(dest_tab, dest_tab.getDbName() + "@" + dest_tab.getTableName() + "@" + ppath, partSpec); - outputs.add(new WriteEntity(p, false)); + output = new WriteEntity(p, false); + outputs.add(output); } catch (HiveException e) { throw new SemanticException(e.getMessage(), e); } } } + ctx.getLoadTableOutputMap().put(ltd, output); break; } case QBMetaData.DEST_PARTITION: {