Hi, Ning Just to be clear on what I was suggesting, I have created a patch only for this file. Please have a look.
Thanks, MIS. On Thu, Mar 3, 2011 at 5:50 PM, M IS <misapa...@gmail.com> wrote: > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/460/ > > trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java<https://reviews.apache.org/r/460/diff/1/?file=13550#file13550line82> > (Diff > revision 1) > > public void run(SessionState sess, Set<ReadEntity> inputs, > > 77 > > Thread[] threads = new Thread[nThreads]; > > How about going for a centralized thread pool and submitting the tasks for > that pool. > This can have advantages like, we need not have to create threads and we > could come to know of the status of the task submitted through the future > object. And use this future to to wait till the task is finished. We can re > factor the code to make UpdateWorker to implement Runnable instead of > extending of Thread. > > > - M > > On March 3rd, 2011, 12:53 a.m., Ning Zhang wrote: > Review request for hive. > By Ning Zhang. > > *Updated 2011-03-03 00:53:49* > Description > > define hive.hooks.parallel.degree to control max # of thread to update > metastore in parallel. > > Diffs > > - trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java > (1076459) > - trunk/conf/hive-default.xml (1076459) > - > trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java > (1076459) > - > trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java > (1076459) > > View Diff <https://reviews.apache.org/r/460/diff/> >
### Eclipse Workspace Patch 1.0 #P hive Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java (revision 1076702) +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java (working copy) @@ -17,18 +17,26 @@ */ package org.apache.hadoop.hive.ql.hooks; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Set; -import java.util.LinkedHashMap; -import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.security.UserGroupInformation; + /** * Implementation of a pre execute hook that updates the access * times for all the inputs. @@ -39,7 +47,6 @@ public static class PreExec implements PreExecute { Hive db; - public void run(SessionState sess, Set<ReadEntity> inputs, Set<WriteEntity> outputs, UserGroupInformation ugi) throws Exception { @@ -54,35 +61,122 @@ } } + if (inputs.size() == 0) { + return; + } + int lastAccessTime = (int) (System.currentTimeMillis()/1000); - for(ReadEntity re: inputs) { - // Set the last query time + int nThreads = HiveConf.getIntVar(sess.getConf(), HiveConf.ConfVars.HOOKS_PARALLEL_DEGREE); + int maxThreads = HiveConf.getIntVar(sess.getConf(), HiveConf.ConfVars.METASTORESERVERMAXTHREADS); + + if (nThreads < 1) { + nThreads = 1; + } else if (nThreads > maxThreads) { + nThreads = maxThreads; + } + if (nThreads > inputs.size()) { + nThreads = inputs.size(); + } + + // This can be a rather common/centrally used thread pool. + ExecutorService exeService = new ThreadPoolExecutor(nThreads, nThreads, 5000, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>()); + List<Future<?>> futures = new ArrayList<Future<?>>(nThreads); + + List<ReadEntity>[] threadInputs = new List[nThreads]; + + // assign ReadEntities to threads + int i = 0; + for (i = 0; i < nThreads; ++i) { + threadInputs[i] = new ArrayList<ReadEntity>(); + } + + i = 0; + for (ReadEntity re: inputs) { + threadInputs[i % nThreads].add(re); + ++i; + } + + try { + // launch all threads + Runnable updateWorker; + Future<?> futureTask; + for (i = 0; i < nThreads; ++i) { + updateWorker = new UpdateWorker(sess.getConf(), threadInputs[i], lastAccessTime); + futureTask = exeService.submit(updateWorker); + futures.add(futureTask); + } + + // wait for all the tasks to finish + for (Future<?> future : futures) { + future.get(); + } + } catch (Throwable e) { + // wrap the RuntimeException thrown from threads + throw new HiveException(e); + } finally { + exeService.shutdown(); + } + } + } + + public static class UpdateWorker implements Runnable { + List<ReadEntity> reList; + // each thread must have a different Hive object + Hive db; + int lastAccessTime; + // keep track of which table's lastAccessTime has been updated + static Set<String> updatedTables = Collections.synchronizedSet(new HashSet<String>()); + + public UpdateWorker(HiveConf conf, List<ReadEntity> ent, int time) + throws HiveException { + super(); + this.reList = ent; + this.db = Hive.get(conf); // get a thread local object + this.lastAccessTime = time; + } + + @Override + public void run() { + for (ReadEntity re: reList) { ReadEntity.Type typ = re.getType(); + String tblName = null; + try { switch(typ) { // It is possible that read and write entities contain a old version // of the object, before it was modified by StatsTask. // Get the latest versions of the object - case TABLE: { - Table t = db.getTable(re.getTable().getTableName()); - t.setLastAccessTime(lastAccessTime); - db.alterTable(t.getTableName(), t); - break; - } case PARTITION: { Partition p = re.getPartition(); - Table t = db.getTable(p.getTable().getTableName()); - p = db.getPartition(t, p.getSpec(), false); + tblName = p.getTable().getTableName(); p.setLastAccessTime(lastAccessTime); - db.alterPartition(t.getTableName(), p); - t.setLastAccessTime(lastAccessTime); - db.alterTable(t.getTableName(), t); + db.alterPartition(tblName, p); + if (updatedTables.contains(tblName)) { + break; + } + // fall through to update table + } + case TABLE: { + if (tblName == null) { + tblName = re.getTable().getTableName(); + } + if (!updatedTables.contains(tblName)) { + updatedTables.add(tblName); + Table t = db.getTable(tblName); + t.setLastAccessTime(lastAccessTime); + db.alterTable(tblName, t); + } break; } default: // ignore dummy inputs break; } + } catch(Exception e) { + // fail the hook if any update failed. + throw new RuntimeException(e); + } } } }