Hi, Ning

Just to be clear on what I was suggesting, I have created a patch only for
this file.
Please have a look.

Thanks,
MIS.


On Thu, Mar 3, 2011 at 5:50 PM, M IS <misapa...@gmail.com> wrote:

>    This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/460/
>
> trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java<https://reviews.apache.org/r/460/diff/1/?file=13550#file13550line82>
>  (Diff
> revision 1)
>
> public void run(SessionState sess, Set<ReadEntity> inputs,
>
>    77
>
>       Thread[] threads = new Thread[nThreads];
>
>   How about going for a centralized thread pool and submitting the tasks for 
> that pool.
> This can have advantages like, we need not have to create threads and we 
> could come to know of the status of the task submitted through the future 
> object. And use this future to to wait till the task is finished. We can re 
> factor the code to make UpdateWorker to implement Runnable instead of 
> extending of Thread.
>
>
> - M
>
> On March 3rd, 2011, 12:53 a.m., Ning Zhang wrote:
>   Review request for hive.
> By Ning Zhang.
>
> *Updated 2011-03-03 00:53:49*
> Description
>
> define hive.hooks.parallel.degree to control max # of thread to update 
> metastore in parallel.
>
>   Diffs
>
>    - trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
>    (1076459)
>    - trunk/conf/hive-default.xml (1076459)
>    - 
> trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
>    (1076459)
>    - 
> trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java
>    (1076459)
>
> View Diff <https://reviews.apache.org/r/460/diff/>
>
### Eclipse Workspace Patch 1.0
#P hive
Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java	(revision 1076702)
+++ ql/src/java/org/apache/hadoop/hive/ql/hooks/UpdateInputAccessTimeHook.java	(working copy)
@@ -17,18 +17,26 @@
  */
 package org.apache.hadoop.hive.ql.hooks;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
-import java.util.LinkedHashMap;
-import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.security.UserGroupInformation;
 
+
 /**
  * Implementation of a pre execute hook that updates the access
  * times for all the inputs.
@@ -39,7 +47,6 @@
 
   public static class PreExec implements PreExecute {
     Hive db;
-
     public void run(SessionState sess, Set<ReadEntity> inputs,
                     Set<WriteEntity> outputs, UserGroupInformation ugi)
       throws Exception {
@@ -54,35 +61,122 @@
         }
       }
 
+      if (inputs.size() == 0) {
+        return;
+      }
+
       int lastAccessTime = (int) (System.currentTimeMillis()/1000);
 
-      for(ReadEntity re: inputs) {
-        // Set the last query time
+      int nThreads = HiveConf.getIntVar(sess.getConf(), HiveConf.ConfVars.HOOKS_PARALLEL_DEGREE);
+      int maxThreads = HiveConf.getIntVar(sess.getConf(), HiveConf.ConfVars.METASTORESERVERMAXTHREADS);
+
+      if (nThreads < 1) {
+        nThreads = 1;
+      } else if (nThreads > maxThreads) {
+        nThreads = maxThreads;
+      }
+      if (nThreads > inputs.size()) {
+        nThreads = inputs.size();
+      }
+
+      // This can be a rather common/centrally used thread pool.
+      ExecutorService exeService = new ThreadPoolExecutor(nThreads, nThreads, 5000, TimeUnit.MILLISECONDS,
+          new LinkedBlockingQueue<Runnable>());
+      List<Future<?>> futures = new ArrayList<Future<?>>(nThreads);
+
+      List<ReadEntity>[] threadInputs = new List[nThreads];
+
+      // assign ReadEntities to threads
+      int i = 0;
+      for (i = 0; i < nThreads; ++i) {
+        threadInputs[i] = new ArrayList<ReadEntity>();
+      }
+
+      i = 0;
+      for (ReadEntity re: inputs) {
+        threadInputs[i % nThreads].add(re);
+        ++i;
+      }
+
+      try {
+        // launch all threads
+        Runnable updateWorker;
+        Future<?> futureTask;
+        for (i = 0; i < nThreads; ++i) {
+          updateWorker = new UpdateWorker(sess.getConf(), threadInputs[i], lastAccessTime);
+          futureTask =  exeService.submit(updateWorker);
+          futures.add(futureTask);
+        }
+
+        // wait for all the tasks to finish
+        for (Future<?> future : futures) {
+          future.get();
+        }
+      } catch (Throwable e) {
+        // wrap the RuntimeException thrown from threads
+        throw new HiveException(e);
+      } finally {
+        exeService.shutdown();
+      }
+    }
+  }
+
+  public static class UpdateWorker implements Runnable {
+    List<ReadEntity> reList;
+    // each thread must have a different Hive object
+    Hive db;
+    int lastAccessTime;
+    // keep track of which table's lastAccessTime has been updated
+    static Set<String> updatedTables = Collections.synchronizedSet(new HashSet<String>());
+
+    public UpdateWorker(HiveConf conf, List<ReadEntity> ent, int time)
+        throws HiveException {
+      super();
+      this.reList = ent;
+      this.db = Hive.get(conf); // get a thread local object
+      this.lastAccessTime = time;
+    }
+
+    @Override
+    public void run() {
+      for (ReadEntity re: reList) {
         ReadEntity.Type typ = re.getType();
+        String tblName = null;
+        try {
         switch(typ) {
         // It is possible that read and write entities contain a old version
         // of the object, before it was modified by StatsTask.
         // Get the latest versions of the object
-        case TABLE: {
-          Table t = db.getTable(re.getTable().getTableName());
-          t.setLastAccessTime(lastAccessTime);
-          db.alterTable(t.getTableName(), t);
-          break;
-        }
         case PARTITION: {
           Partition p = re.getPartition();
-          Table t = db.getTable(p.getTable().getTableName());
-          p = db.getPartition(t, p.getSpec(), false);
+          tblName = p.getTable().getTableName();
           p.setLastAccessTime(lastAccessTime);
-          db.alterPartition(t.getTableName(), p);
-          t.setLastAccessTime(lastAccessTime);
-          db.alterTable(t.getTableName(), t);
+          db.alterPartition(tblName, p);
+          if (updatedTables.contains(tblName)) {
+            break;
+          }
+          // fall through to update table
+        }
+        case TABLE: {
+          if (tblName == null) {
+            tblName = re.getTable().getTableName();
+          }
+          if (!updatedTables.contains(tblName)) {
+            updatedTables.add(tblName);
+            Table t = db.getTable(tblName);
+            t.setLastAccessTime(lastAccessTime);
+            db.alterTable(tblName, t);
+          }
           break;
         }
         default:
           // ignore dummy inputs
           break;
         }
+        } catch(Exception e) {
+          // fail the hook if any update failed.
+          throw new RuntimeException(e);
+        }
       }
     }
   }

Reply via email to