Hi, Ning 1) You are right on this. But, here the keepAliveTime will not be having much of prominence as the core pool size itself is nThreads and the max pool size is also nThreads. So, even if the threads are idle, nThreads will always remain in the pool that is created to process the tasks. Also, since in this scenario the thread pool is being created for a specific purpose thread pool configuration is fine.
This can be achieved in a more simple manner as below: ExecutorService exeService = Executors.newFixedThreadPool(nThreads); I'll use this in the new patch. Further, If we are going to be use (which we need to in future) a common or a centralized thread pool, then the thread pool configuration needs to be carefully arrived at taking into account the number of cores available at our disposal on a particular machine and depending on profiling results, but this is for later. 2) In the current scenario, we need to call execService.shutDown() in any case, if an exception is thrown or otherwise, as it is a local thread pool and we won't be using it any further. If the thread pool were to be a common/centralized one, we need not have to call shutDown(). Please let me know if this is fine, then I'll upload the patch attached with this file in Jira. Thanks, ಕರಿಯ On Fri, Mar 4, 2011 at 12:44 AM, Ning Zhang <nzh...@fb.com> wrote: > Hi MIS, > > Thanks for the contribution! To allow broader audience to review, can you > upload your patch to the JIRA and the review board (I can help you with the > review board if it doesn't allow you to change the request). > > A couple of comments before uploading your patch: > > 1) the 5 sec keepAliveTime seems low. If the # of threads is more than > the # of cores, does it mean the thread will be terminated after 5 secs > after it is waiting to get scheduled? > > 2) do you need to call execService.shutDown() in case of a Throwable is > caught? > > On Mar 3, 2011, at 10:09 AM, MIS wrote: > > Hi, Ning > > Just to be clear on what I was suggesting, I have created a patch only for > this file. > Please have a look. > > Thanks, > MIS. > > > On Thu, Mar 3, 2011 at 5:50 PM, M IS <misapa...@gmail.com> wrote: > >> This is an automatically generated e-mail. To reply, visit: >> https://reviews.apache.org/r/460/ >> >> trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java<https://reviews.apache.org/r/460/diff/1/?file=13550#file13550line82> >> (Diff >> revision 1) >> >> public void run(SessionState sess, Set<ReadEntity> inputs, >> >> 77 >> >> Thread[] threads = new Thread[nThreads]; >> >> How about going for a centralized thread pool and submitting the tasks for >> that pool. >> This can have advantages like, we need not have to create threads and we >> could come to know of the status of the task submitted through the future >> object. And use this future to to wait till the task is finished. We can re >> factor the code to make UpdateWorker to implement Runnable instead of >> extending of Thread. >> >> >> - M >> >> On March 3rd, 2011, 12:53 a.m., Ning Zhang wrote: >> Review request for hive. >> By Ning Zhang. >> >> *Updated 2011-03-03 00:53:49* >> Description >> >> define hive.hooks.parallel.degree to control max # of thread to update >> metastore in parallel. >> >> Diffs >> >> - trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java >> (1076459) >> - trunk/conf/hive-default.xml (1076459) >> - >> trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java >> (1076459) >> - >> trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java >> (1076459) >> >> View Diff <https://reviews.apache.org/r/460/diff/> >> > > <HIVE-2026_1.patch> > > >
### Eclipse Workspace Patch 1.0 #P hive Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1076715) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -111,6 +111,7 @@ DEFAULT_ZOOKEEPER_PARTITION_NAME("hive.lockmgr.zookeeper.default.partition.name", "__HIVE_DEFAULT_ZOOKEEPER_PARTITION__"), // Whether to show a link to the most failed task + debugging tips SHOW_JOB_FAIL_DEBUG_INFO("hive.exec.show.job.failure.debug.info", true), + HOOKS_PARALLEL_DEGREE("hive.hooks.parallel.degree", 1), // should hive determine whether to run in local mode automatically ? LOCALMODEAUTO("hive.exec.mode.local.auto", false), Index: metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (revision 1076715) +++ metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (working copy) @@ -26,9 +26,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.Map.Entry; import java.util.Properties; +import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -69,26 +69,26 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.Type; +import org.apache.hadoop.hive.metastore.model.MDBPrivilege; import org.apache.hadoop.hive.metastore.model.MDatabase; import org.apache.hadoop.hive.metastore.model.MFieldSchema; +import org.apache.hadoop.hive.metastore.model.MGlobalPrivilege; import org.apache.hadoop.hive.metastore.model.MIndex; import org.apache.hadoop.hive.metastore.model.MOrder; import org.apache.hadoop.hive.metastore.model.MPartition; -import org.apache.hadoop.hive.metastore.model.MDBPrivilege; import org.apache.hadoop.hive.metastore.model.MPartitionColumnPrivilege; import org.apache.hadoop.hive.metastore.model.MPartitionPrivilege; import org.apache.hadoop.hive.metastore.model.MRole; -import org.apache.hadoop.hive.metastore.model.MTableColumnPrivilege; -import org.apache.hadoop.hive.metastore.model.MGlobalPrivilege; import org.apache.hadoop.hive.metastore.model.MRoleMap; import org.apache.hadoop.hive.metastore.model.MSerDeInfo; import org.apache.hadoop.hive.metastore.model.MStorageDescriptor; import org.apache.hadoop.hive.metastore.model.MTable; +import org.apache.hadoop.hive.metastore.model.MTableColumnPrivilege; import org.apache.hadoop.hive.metastore.model.MTablePrivilege; import org.apache.hadoop.hive.metastore.model.MType; +import org.apache.hadoop.hive.metastore.parser.ExpressionTree.ANTLRNoCaseStringStream; import org.apache.hadoop.hive.metastore.parser.FilterLexer; import org.apache.hadoop.hive.metastore.parser.FilterParser; -import org.apache.hadoop.hive.metastore.parser.ExpressionTree.ANTLRNoCaseStringStream; import org.apache.hadoop.util.StringUtils; /** @@ -405,7 +405,7 @@ db.setParameters(mdb.getParameters()); return db; } - + /** * Alter the database object in metastore. Currently only the parameters * of the database can be changed. @@ -449,7 +449,6 @@ // then drop the database MDatabase db = getMDatabase(dbname); - pm.retrieve(db); if (db != null) { List<MDBPrivilege> dbGrants = this.listDatabaseGrants(dbname); if (dbGrants != null && dbGrants.size() > 0) { @@ -614,7 +613,7 @@ Map<String, List<PrivilegeGrantInfo>> groupPrivs = principalPrivs.getGroupPrivileges(); putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, groupPrivs, PrincipalType.GROUP); - + Map<String, List<PrivilegeGrantInfo>> rolePrivs = principalPrivs.getRolePrivileges(); putPersistentPrivObjects(mtbl, toPersistPrivObjs, now, rolePrivs, PrincipalType.ROLE); } @@ -631,7 +630,7 @@ * Convert PrivilegeGrantInfo from privMap to MTablePrivilege, and add all of * them to the toPersistPrivObjs. These privilege objects will be persisted as * part of createTable. - * + * * @param mtbl * @param toPersistPrivObjs * @param now @@ -665,7 +664,6 @@ try { openTransaction(); MTable tbl = getMTable(dbName, tableName); - pm.retrieve(tbl); if (tbl != null) { // first remove all the partitions List<MTablePrivilege> tabGrants = listAllTableGrants(dbName, tableName); @@ -682,7 +680,7 @@ if (partGrants != null && partGrants.size() > 0) { pm.deletePersistentAll(partGrants); } - + List<MPartitionColumnPrivilege> partColGrants = listTableAllPartitionColumnGrants(dbName, tableName); if (partColGrants != null && partColGrants.size() > 0) { @@ -700,7 +698,7 @@ } return success; } - + public Table getTable(String dbName, String tableName) throws MetaException { boolean commited = false; Table tbl = null; @@ -960,7 +958,7 @@ toPersist.add(partGrant); } } - + if (tabColumnGrants != null) { for (MTableColumnPrivilege col : tabColumnGrants) { MPartitionColumnPrivilege partColumn = new MPartitionColumnPrivilege(col @@ -969,7 +967,7 @@ .getGrantorType(), col.getGrantOption()); toPersist.add(partColumn); } - + if (toPersist.size() > 0) { pm.makePersistentAll(toPersist); } @@ -996,7 +994,7 @@ } return part; } - + private MPartition getMPartition(String dbName, String tableName, List<String> part_vals) throws MetaException { MPartition mpart = null; @@ -1039,10 +1037,13 @@ throw new InvalidObjectException( "Partition doesn't have a valid table or database name"); } - return new MPartition(Warehouse.makePartName(convertToFieldSchemas(mt - .getPartitionKeys()), part.getValues()), mt, part.getValues(), part - .getCreateTime(), part.getLastAccessTime(), - convertToMStorageDescriptor(part.getSd()), part.getParameters()); + return new MPartition( + Warehouse.makePartName(convertToFieldSchemas(mt.getPartitionKeys()), part.getValues()), + mt, part.getValues(), + part.getCreateTime(), + part.getLastAccessTime(), + convertToMStorageDescriptor(part.getSd()), + part.getParameters()); } private Partition convertToPart(MPartition mpart) throws MetaException { @@ -1068,7 +1069,7 @@ colNames.add(col.getName()); } String partName = FileUtils.makePartName(colNames, part_vals); - + List<MPartitionPrivilege> partGrants = listPartitionGrants( dbName, tableName, partName); @@ -1134,7 +1135,7 @@ } } } - + @Override public Partition getPartitionWithAuth(String dbName, String tblName, List<String> partVals, String user_name, List<String> group_names) @@ -1778,7 +1779,7 @@ } return success; } - + private MRoleMap getMSecurityUserRoleMap(String userName, PrincipalType principalType, String roleName) { MRoleMap mRoleMember = null; @@ -1806,7 +1807,6 @@ try { openTransaction(); MRole mRol = getMRole(roleName); - pm.retrieve(mRol); if (mRol != null) { // first remove all the membership, the membership that this role has // been granted @@ -1861,7 +1861,7 @@ } return success; } - + private List<MRoleMap> listRoles(String userName, List<String> groupNames) { List<MRoleMap> ret = new ArrayList<MRoleMap>(); @@ -1875,7 +1875,7 @@ } return ret; } - + @SuppressWarnings("unchecked") @Override public List<MRoleMap> listRoles(String principalName, @@ -1942,7 +1942,7 @@ .getOwnerName()); return ret; } - + private MRole getMRole(String roleName) { MRole mrole = null; boolean commited = false; @@ -1961,7 +1961,7 @@ } return mrole; } - + public List<String> listRoleNames() { boolean success = false; try { @@ -1982,7 +1982,7 @@ } } } - + @Override public PrincipalPrivilegeSet getUserPrivilegeSet(String userName, List<String> groupNames) throws InvalidObjectException, MetaException { @@ -2030,7 +2030,7 @@ } return ret; } - + public List<PrivilegeGrantInfo> getDBPrivilege(String dbName, String principalName, PrincipalType principalType) throws InvalidObjectException, MetaException { @@ -2054,7 +2054,7 @@ return new ArrayList<PrivilegeGrantInfo>(0); } - + @Override public PrincipalPrivilegeSet getDBPrivilegeSet(String dbName, String userName, List<String> groupNames) throws InvalidObjectException, @@ -2185,7 +2185,7 @@ } return ret; } - + @Override public PrincipalPrivilegeSet getColumnPrivilegeSet(String dbName, String tableName, String partitionName, String columnName, @@ -2231,7 +2231,7 @@ } return ret; } - + private List<PrivilegeGrantInfo> getPartitionPrivilege(String dbName, String tableName, String partName, String principalName, PrincipalType principalType) { @@ -2262,7 +2262,7 @@ private PrincipalType getPrincipalTypeFromStr(String str) { return str == null ? null : PrincipalType.valueOf(str); } - + private List<PrivilegeGrantInfo> getTablePrivilege(String dbName, String tableName, String principalName, PrincipalType principalType) { tableName = tableName.toLowerCase().trim(); @@ -2286,11 +2286,11 @@ } return new ArrayList<PrivilegeGrantInfo>(0); } - + private List<PrivilegeGrantInfo> getColumnPrivilege(String dbName, String tableName, String columnName, String partitionName, String principalName, PrincipalType principalType) { - + tableName = tableName.toLowerCase().trim(); dbName = dbName.toLowerCase().trim(); columnName = columnName.toLowerCase().trim(); @@ -2472,7 +2472,7 @@ userName, principalType, hiveObject.getDbName(), hiveObject .getObjectName(), partObj.getPartitionName(), hiveObject.getColumnName()); - + if (colPrivs != null) { for (MPartitionColumnPrivilege priv : colPrivs) { if (priv.getGrantor().equalsIgnoreCase(grantor)) { @@ -2495,13 +2495,13 @@ grantOption); persistentObjs.add(mCol); } - + } else { List<MTableColumnPrivilege> colPrivs = null; colPrivs = this.listPrincipalTableColumnGrants( userName, principalType, hiveObject.getDbName(), hiveObject .getObjectName(), hiveObject.getColumnName()); - + if (colPrivs != null) { for (MTableColumnPrivilege priv : colPrivs) { if (priv.getGrantor().equalsIgnoreCase(grantor)) { @@ -2539,7 +2539,7 @@ } return committed; } - + @Override public boolean revokePrivileges(PrivilegeBag privileges) throws InvalidObjectException, MetaException, NoSuchObjectException { @@ -2547,13 +2547,13 @@ try { openTransaction(); List<Object> persistentObjs = new ArrayList<Object>(); - + List<HiveObjectPrivilege> privilegeList = privileges.getPrivileges(); - + if (privilegeList != null && privilegeList.size() > 0) { Iterator<HiveObjectPrivilege> privIter = privilegeList.iterator(); - + while (privIter.hasNext()) { HiveObjectPrivilege privDef = privIter.next(); HiveObjectRef hiveObject = privDef.getHiveObject(); @@ -2585,7 +2585,7 @@ } } } - + } else if (hiveObject.getObjectType() == HiveObjectType.DATABASE) { MDatabase dbObj = getMDatabase(hiveObject.getDbName()); if (dbObj != null) { @@ -2630,7 +2630,7 @@ } } } else if (hiveObject.getObjectType() == HiveObjectType.PARTITION) { - + boolean found = false; Table tabObj = this.getTable(hiveObject.getDbName(), hiveObject.getObjectName()); String partName = null; @@ -2664,7 +2664,7 @@ partName = Warehouse.makePartName(tabObj.getPartitionKeys(), hiveObject.getPartValues()); } - + if (partName != null) { List<MPartitionColumnPrivilege> mSecCol = listPrincipalPartitionColumnGrants( userName, principalType, hiveObject.getDbName(), hiveObject @@ -2718,7 +2718,7 @@ } } } - + if (persistentObjs.size() > 0) { pm.deletePersistentAll(persistentObjs); } @@ -2730,7 +2730,7 @@ } return committed; } - + @SuppressWarnings("unchecked") private List<MRoleMap> listRoleMembers( MRole mRol) { @@ -2756,7 +2756,7 @@ } return mRoleMemeberList; } - + @SuppressWarnings("unchecked") @Override public List<MGlobalPrivilege> listPrincipalGlobalGrants(String principalName, PrincipalType principalType) { @@ -2809,7 +2809,7 @@ } return mSecurityDBList; } - + @SuppressWarnings("unchecked") private List<MDBPrivilege> listPrincipalAllDBGrant( String principalName, PrincipalType principalType) { @@ -2866,7 +2866,7 @@ } return mSecurityTabList; } - + @SuppressWarnings("unchecked") public List<MPartitionPrivilege> listTableAllPartitionGrants(String dbName, String tableName) { @@ -2896,7 +2896,7 @@ } return mSecurityTabPartList; } - + @SuppressWarnings("unchecked") public List<MTableColumnPrivilege> listTableAllColumnGrants(String dbName, String tableName) { @@ -2924,7 +2924,7 @@ } return mTblColPrivilegeList; } - + @SuppressWarnings("unchecked") public List<MPartitionColumnPrivilege> listTableAllPartitionColumnGrants(String dbName, String tableName) { @@ -2952,7 +2952,7 @@ } return mSecurityColList; } - + @SuppressWarnings("unchecked") public List<MPartitionColumnPrivilege> listPartitionAllColumnGrants(String dbName, String tableName, String partName) { @@ -3006,7 +3006,7 @@ } } } - + @SuppressWarnings("unchecked") private List<MPartitionPrivilege> listPartitionGrants(String dbName, String tableName, String partName) { @@ -3091,7 +3091,7 @@ mSecurityTabPartList = (List<MPartitionPrivilege>) query .executeWithArray(principalName, principalType.toString(), tableName, dbName, partName); LOG.debug("Done executing query for listMSecurityPrincipalPartitionGrant"); - + pm.retrieveAll(mSecurityTabPartList); success = commitTransaction(); LOG.debug("Done retrieving all objects for listMSecurityPrincipalPartitionGrant"); @@ -3136,7 +3136,7 @@ } return mSecurityColList; } - + @SuppressWarnings("unchecked") public List<MPartitionColumnPrivilege> listPrincipalPartitionColumnGrants( String principalName, PrincipalType principalType, String dbName, @@ -3175,7 +3175,7 @@ } return mSecurityColList; } - + @SuppressWarnings("unchecked") private List<MTablePrivilege> listPrincipalAllTableGrants( String principalName, PrincipalType principalType) { @@ -3229,7 +3229,7 @@ } return mSecurityTabPartList; } - + @SuppressWarnings("unchecked") private List<MTableColumnPrivilege> listPrincipalAllTableColumnGrants( String principalName, PrincipalType principalType) { @@ -3255,7 +3255,7 @@ } return mSecurityColumnList; } - + @SuppressWarnings("unchecked") private List<MPartitionColumnPrivilege> listPrincipalAllPartitionColumnGrants( String principalName, PrincipalType principalType) { Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 1076715) +++ conf/hive-default.xml (working copy) @@ -490,6 +490,12 @@ </property> <property> + <name>hive.hooks.parallel.degree</name> + <value>1</value> + <description>Maximum number of threads that can be run in hooks (particularly in UpdateInputAccessTimeHook).</description> +</property> + +<property> <name>hive.merge.mapfiles</name> <value>true</value> <description>Merge small files at the end of a map-only job</description> Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java (revision 1076715) +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java (working copy) @@ -17,18 +17,24 @@ */ package org.apache.hadoop.hive.ql.hooks; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Set; -import java.util.LinkedHashMap; -import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.security.UserGroupInformation; + /** * Implementation of a pre execute hook that updates the access * times for all the inputs. @@ -39,7 +45,6 @@ public static class PreExec implements PreExecute { Hive db; - public void run(SessionState sess, Set<ReadEntity> inputs, Set<WriteEntity> outputs, UserGroupInformation ugi) throws Exception { @@ -54,35 +59,122 @@ } } + if (inputs.size() == 0) { + return; + } + int lastAccessTime = (int) (System.currentTimeMillis()/1000); - for(ReadEntity re: inputs) { - // Set the last query time + int nThreads = HiveConf.getIntVar(sess.getConf(), HiveConf.ConfVars.HOOKS_PARALLEL_DEGREE); + int maxThreads = HiveConf.getIntVar(sess.getConf(), HiveConf.ConfVars.METASTORESERVERMAXTHREADS); + + if (nThreads < 1) { + nThreads = 1; + } else if (nThreads > maxThreads) { + nThreads = maxThreads; + } + if (nThreads > inputs.size()) { + nThreads = inputs.size(); + } + + // This can be a rather common/centrally used thread pool. + ExecutorService exeService = Executors.newFixedThreadPool(nThreads); + List<Future<?>> futures = new ArrayList<Future<?>>(nThreads); + + List<ReadEntity>[] threadInputs = new List[nThreads]; + + // assign ReadEntities to threads + int i = 0; + for (i = 0; i < nThreads; ++i) { + threadInputs[i] = new ArrayList<ReadEntity>(); + } + + i = 0; + for (ReadEntity re: inputs) { + threadInputs[i % nThreads].add(re); + ++i; + } + + try { + // launch all threads + Runnable updateWorker; + Future<?> futureTask; + for (i = 0; i < nThreads; ++i) { + updateWorker = new UpdateWorker(sess.getConf(), threadInputs[i], lastAccessTime); + futureTask = exeService.submit(updateWorker); + futures.add(futureTask); + } + + // wait for all tasks to finish + for (Future<?> future : futures) { + future.get(); + } + } catch (Throwable e) { + // wrap the RuntimeException thrown from threads + throw new HiveException(e); + } + finally { + exeService.shutdown(); + } + } + } + + public static class UpdateWorker implements Runnable { + List<ReadEntity> reList; + // each thread must have a different Hive object + Hive db; + int lastAccessTime; + // keep track of which table's lastAccessTime has been updated + static Set<String> updatedTables = Collections.synchronizedSet(new HashSet<String>()); + + public UpdateWorker(HiveConf conf, List<ReadEntity> ent, int time) + throws HiveException { + super(); + this.reList = ent; + this.db = Hive.get(conf); // get a thread local object + this.lastAccessTime = time; + } + + @Override + public void run() { + for (ReadEntity re: reList) { ReadEntity.Type typ = re.getType(); + String tblName = null; + try { switch(typ) { // It is possible that read and write entities contain a old version // of the object, before it was modified by StatsTask. // Get the latest versions of the object - case TABLE: { - Table t = db.getTable(re.getTable().getTableName()); - t.setLastAccessTime(lastAccessTime); - db.alterTable(t.getTableName(), t); - break; - } case PARTITION: { Partition p = re.getPartition(); - Table t = db.getTable(p.getTable().getTableName()); - p = db.getPartition(t, p.getSpec(), false); + tblName = p.getTable().getTableName(); p.setLastAccessTime(lastAccessTime); - db.alterPartition(t.getTableName(), p); - t.setLastAccessTime(lastAccessTime); - db.alterTable(t.getTableName(), t); + db.alterPartition(tblName, p); + if (updatedTables.contains(tblName)) { + break; + } + // fall through to update table + } + case TABLE: { + if (tblName == null) { + tblName = re.getTable().getTableName(); + } + if (!updatedTables.contains(tblName)) { + updatedTables.add(tblName); + Table t = db.getTable(tblName); + t.setLastAccessTime(lastAccessTime); + db.alterTable(tblName, t); + } break; } default: // ignore dummy inputs break; } + } catch(Exception e) { + // fail the hook if any update failed. + throw new RuntimeException(e); + } } } }