Modified: 
hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
URL: 
http://svn.apache.org/viewvc/hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- 
hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
 (original)
+++ 
hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
 Thu Sep 12 01:21:10 2013
@@ -19,9 +19,18 @@
 package org.apache.hive.service.cli.session;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.hooks.HookUtils;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hive.service.CompositeService;
 import org.apache.hive.service.cli.HiveSQLException;
 import org.apache.hive.service.cli.SessionHandle;
@@ -32,11 +41,12 @@ import org.apache.hive.service.cli.opera
  *
  */
 public class SessionManager extends CompositeService {
-
+  private static final Log LOG = LogFactory.getLog(CompositeService.class);
   private HiveConf hiveConf;
   private final Map<SessionHandle, HiveSession> handleToSession = new 
HashMap<SessionHandle, HiveSession>();
   private OperationManager operationManager = new OperationManager();
   private static final Object sessionMapLock = new Object();
+  private ExecutorService backgroundOperationPool;
 
   public SessionManager() {
     super("SessionManager");
@@ -45,10 +55,11 @@ public class SessionManager extends Comp
   @Override
   public synchronized void init(HiveConf hiveConf) {
     this.hiveConf = hiveConf;
-
     operationManager = new OperationManager();
+    int backgroundPoolSize = 
hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
+    LOG.info("HiveServer2: Async execution pool size" + backgroundPoolSize);
+    backgroundOperationPool = Executors.newFixedThreadPool(backgroundPoolSize);
     addService(operationManager);
-
     super.init(hiveConf);
   }
 
@@ -62,6 +73,16 @@ public class SessionManager extends Comp
   public synchronized void stop() {
     // TODO
     super.stop();
+    if (backgroundOperationPool != null) {
+      backgroundOperationPool.shutdown();
+      long timeout = 
hiveConf.getLongVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT);
+      try {
+        backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS);
+      } catch (InterruptedException exc) {
+        LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout +
+                       " seconds has been exceeded. RUNNING background 
operations will be shut down", exc);
+      }
+    }
   }
 
 
@@ -72,16 +93,15 @@ public class SessionManager extends Comp
 
   public SessionHandle openSession(String username, String password, 
Map<String, String> sessionConf,
           boolean withImpersonation, String delegationToken) throws 
HiveSQLException {
-    HiveSession session;
     if (username == null) {
       username = threadLocalUserName.get();
     }
-
+    HiveSession session;
     if (withImpersonation) {
-          HiveSessionImplwithUGI hiveSessionUgi = new 
HiveSessionImplwithUGI(username, password, sessionConf,
-              delegationToken);
-          session = (HiveSession)HiveSessionProxy.getProxy(hiveSessionUgi, 
hiveSessionUgi.getSessionUgi());
-          hiveSessionUgi.setProxySession(session);
+      HiveSessionImplwithUGI hiveSessionUgi = new 
HiveSessionImplwithUGI(username, password,
+        sessionConf, delegationToken);
+      session = HiveSessionProxy.getProxy(hiveSessionUgi, 
hiveSessionUgi.getSessionUgi());
+      hiveSessionUgi.setProxySession(session);
     } else {
       session = new HiveSessionImpl(username, password, sessionConf);
     }
@@ -90,6 +110,11 @@ public class SessionManager extends Comp
     synchronized(sessionMapLock) {
       handleToSession.put(session.getSessionHandle(), session);
     }
+    try {
+      executeSessionHooks(session);
+    } catch (Exception e) {
+      throw new HiveSQLException("Failed to execute session hooks", e);
+    }
     return session.getSessionHandle();
   }
 
@@ -150,4 +175,17 @@ public class SessionManager extends Comp
     threadLocalUserName.remove();
   }
 
+  // execute session hooks
+  private void executeSessionHooks(HiveSession session) throws Exception {
+    List<HiveSessionHook> sessionHooks = HookUtils.getHooks(hiveConf,
+        HiveConf.ConfVars.HIVE_SERVER2_SESSION_HOOK, HiveSessionHook.class);
+    for (HiveSessionHook sessionHook : sessionHooks) {
+      sessionHook.run(new HiveSessionHookContextImpl(session));
+    }
+  }
+
+  public Future<?> submitBackgroundOperation(Runnable r) {
+    return backgroundOperationPool.submit(r);
+  }
+
 }

Modified: 
hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
URL: 
http://svn.apache.org/viewvc/hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- 
hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
 (original)
+++ 
hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
 Thu Sep 12 01:21:10 2013
@@ -200,8 +200,10 @@ public class ThriftCLIService extends Ab
       SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
       String statement = req.getStatement();
       Map<String, String> confOverlay = req.getConfOverlay();
-      OperationHandle operationHandle =
-          cliService.executeStatement(sessionHandle, statement, confOverlay);
+      Boolean runAsync = req.isRunAsync();
+      OperationHandle operationHandle = runAsync ?
+          cliService.executeStatementAsync(sessionHandle, statement, 
confOverlay)
+              : cliService.executeStatement(sessionHandle, statement, 
confOverlay);
       resp.setOperationHandle(operationHandle.toTOperationHandle());
       resp.setStatus(OK_STATUS);
     } catch (Exception e) {

Modified: 
hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
URL: 
http://svn.apache.org/viewvc/hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- 
hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
 (original)
+++ 
hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
 Thu Sep 12 01:21:10 2013
@@ -122,9 +122,27 @@ public class ThriftCLIServiceClient exte
   public OperationHandle executeStatement(SessionHandle sessionHandle, String 
statement,
       Map<String, String> confOverlay)
           throws HiveSQLException {
+    return executeStatementInternal(sessionHandle, statement, confOverlay, 
false);
+  }
+
+  /* (non-Javadoc)
+   * @see 
org.apache.hive.service.cli.ICLIService#executeStatementAsync(org.apache.hive.service.cli.SessionHandle,
 java.lang.String, java.util.Map)
+   */
+  @Override
+  public OperationHandle executeStatementAsync(SessionHandle sessionHandle, 
String statement,
+      Map<String, String> confOverlay)
+          throws HiveSQLException {
+    return executeStatementInternal(sessionHandle, statement, confOverlay, 
true);
+  }
+
+  private OperationHandle executeStatementInternal(SessionHandle 
sessionHandle, String statement,
+      Map<String, String> confOverlay, boolean isAsync)
+          throws HiveSQLException {
     try {
-      TExecuteStatementReq req = new 
TExecuteStatementReq(sessionHandle.toTSessionHandle(), statement);
+      TExecuteStatementReq req =
+          new TExecuteStatementReq(sessionHandle.toTSessionHandle(), 
statement);
       req.setConfOverlay(confOverlay);
+      req.setRunAsync(isAsync);
       TExecuteStatementResp resp = cliService.ExecuteStatement(req);
       checkStatus(resp.getStatus());
       return new OperationHandle(resp.getOperationHandle());

Modified: 
hive/branches/vectorization/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
URL: 
http://svn.apache.org/viewvc/hive/branches/vectorization/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- 
hive/branches/vectorization/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
 (original)
+++ 
hive/branches/vectorization/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
 Thu Sep 12 01:21:10 2013
@@ -113,4 +113,105 @@ public abstract class CLIServiceTest {
 
     client.closeSession(sessionHandle);
   }
+
+  @Test
+  public void testExecuteStatement() throws Exception {
+    HashMap<String, String> confOverlay = new HashMap<String, String>();
+    SessionHandle sessionHandle = client.openSession("tom", "password",
+        new HashMap<String, String>());
+    assertNotNull(sessionHandle);
+
+    // Change lock manager, otherwise unit-test doesn't go through
+    String setLockMgr = "SET hive.lock.manager=" +
+        "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager";
+    client.executeStatement(sessionHandle, setLockMgr, confOverlay);
+
+    String createTable = "CREATE TABLE TEST_EXEC(ID STRING)";
+    client.executeStatement(sessionHandle, createTable, confOverlay);
+
+    // blocking execute
+    String select = "SELECT ID FROM TEST_EXEC";
+    OperationHandle ophandle = client.executeStatement(sessionHandle, select, 
confOverlay);
+
+    // expect query to be completed now
+    assertEquals("Query should be finished",
+        OperationState.FINISHED, client.getOperationStatus(ophandle));
+  }
+
+  @Test
+  public void testExecuteStatementAsync() throws Exception {
+    HashMap<String, String> confOverlay = new HashMap<String, String>();
+    SessionHandle sessionHandle = client.openSession("tom", "password",
+        new HashMap<String, String>());
+    // Timeout for the poll in case of asynchronous execute
+    long pollTimeout = System.currentTimeMillis() + 100000;
+    assertNotNull(sessionHandle);
+    OperationState state = null;
+    OperationHandle ophandle;
+
+    // Change lock manager, otherwise unit-test doesn't go through
+    String setLockMgr = "SET hive.lock.manager=" +
+        "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager";
+    client.executeStatement(sessionHandle, setLockMgr, confOverlay);
+
+    String createTable = "CREATE TABLE TEST_EXEC_ASYNC(ID STRING)";
+    client.executeStatementAsync(sessionHandle, createTable, confOverlay);
+
+    // Test async execution response when query is malformed
+    String wrongQuery = "SELECT NAME FROM TEST_EXEC";
+    ophandle = client.executeStatementAsync(sessionHandle, wrongQuery, 
confOverlay);
+
+    int count = 0;
+    while (true) {
+      // Break if polling times out
+      if (System.currentTimeMillis() > pollTimeout) {
+          System.out.println("Polling timed out");
+          break;
+      }
+      state = client.getOperationStatus(ophandle);
+      System.out.println("Polling: " + ophandle + " count=" + (++count)
+          + " state=" + state);
+
+      if (OperationState.CANCELED == state || state == OperationState.CLOSED
+          || state == OperationState.FINISHED || state == 
OperationState.ERROR) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+    assertEquals("Query should return an error state",
+        OperationState.ERROR, client.getOperationStatus(ophandle));
+
+    // Test async execution when query is well formed
+    String select = "SELECT ID FROM TEST_EXEC_ASYNC";
+    ophandle =
+        client.executeStatementAsync(sessionHandle, select, confOverlay);
+
+    count = 0;
+    while (true) {
+      // Break if polling times out
+      if (System.currentTimeMillis() > pollTimeout) {
+          System.out.println("Polling timed out");
+          break;
+      }
+      state = client.getOperationStatus(ophandle);
+      System.out.println("Polling: " + ophandle + " count=" + (++count)
+          + " state=" + state);
+
+      if (OperationState.CANCELED == state || state == OperationState.CLOSED
+          || state == OperationState.FINISHED || state == 
OperationState.ERROR) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+    assertEquals("Query should be finished",
+        OperationState.FINISHED, client.getOperationStatus(ophandle));
+
+    // Cancellation test
+    ophandle = client.executeStatementAsync(sessionHandle, select, 
confOverlay);
+    System.out.println("cancelling " + ophandle);
+    client.cancelOperation(ophandle);
+    state = client.getOperationStatus(ophandle);
+    System.out.println(ophandle + " after cancelling, state= " + state);
+    assertEquals("Query should be cancelled", OperationState.CANCELED, state);
+  }
 }

Modified: hive/branches/vectorization/shims/build.xml
URL: 
http://svn.apache.org/viewvc/hive/branches/vectorization/shims/build.xml?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/shims/build.xml (original)
+++ hive/branches/vectorization/shims/build.xml Thu Sep 12 01:21:10 2013
@@ -100,8 +100,6 @@ to call at top-level: ant deploy-contrib
          destdir="${test.build.classes}"
          debug="${javac.debug}"
          optimize="${javac.optimize}"
-         target="${javac.version}"
-         source="${javac.version}"
          deprecation="${javac.deprecation}"
          includeantruntime="false">
           <compilerarg line="${javac.args} ${javac.args.warnings}" />

Modified: 
hive/branches/vectorization/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
URL: 
http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- 
hive/branches/vectorization/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
 (original)
+++ 
hive/branches/vectorization/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
 Thu Sep 12 01:21:10 2013
@@ -735,5 +735,12 @@ public class Hadoop20Shims implements Ha
     throw new UnsupportedOperationException(
         "Kerberos not supported in current hadoop version");
   }
-
+  @Override
+  public HCatHadoopShims getHCatShim() {
+      throw new UnsupportedOperationException("HCatalog does not support 
Hadoop 0.20.x");
+  }
+  @Override
+  public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation 
ugi) throws IOException {
+      throw new UnsupportedOperationException("WebHCat does not support Hadoop 
0.20.x");
+  }
 }

Modified: 
hive/branches/vectorization/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: 
http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- 
hive/branches/vectorization/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
 (original)
+++ 
hive/branches/vectorization/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
 Thu Sep 12 01:21:10 2013
@@ -18,23 +18,33 @@
 package org.apache.hadoop.hive.shims;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Trash;
-import org.apache.hadoop.hive.shims.HadoopShimsSecure;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobTracker;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskLogServlet;
+import org.apache.hadoop.mapred.WebHCatJTShim20S;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
+import org.apache.hadoop.security.UserGroupInformation;
 
 
 /**
@@ -195,4 +205,129 @@ public class Hadoop20SShims extends Hado
       cluster.shutdown();
     }
   }
+  private volatile HCatHadoopShims hcatShimInstance;
+  @Override
+  public HCatHadoopShims getHCatShim() {
+    if(hcatShimInstance == null) {
+      hcatShimInstance = new HCatHadoopShims20S();
+    }
+    return hcatShimInstance;
+  }
+  private final class HCatHadoopShims20S implements HCatHadoopShims {
+    @Override
+    public TaskID createTaskID() {
+      return new TaskID();
+    }
+
+    @Override
+    public TaskAttemptID createTaskAttemptID() {
+      return new TaskAttemptID();
+    }
+
+    @Override
+    public TaskAttemptContext createTaskAttemptContext(Configuration conf, 
TaskAttemptID taskId) {
+      return new TaskAttemptContext(conf, taskId);
+    }
+
+    @Override
+    public org.apache.hadoop.mapred.TaskAttemptContext 
createTaskAttemptContext(org.apache.hadoop.mapred.JobConf conf,
+                 org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable 
progressable) {
+      org.apache.hadoop.mapred.TaskAttemptContext newContext = null;
+      try {
+        java.lang.reflect.Constructor construct = 
org.apache.hadoop.mapred.TaskAttemptContext.class.getDeclaredConstructor(
+                org.apache.hadoop.mapred.JobConf.class, 
org.apache.hadoop.mapred.TaskAttemptID.class,
+                Progressable.class);
+        construct.setAccessible(true);
+        newContext = 
(org.apache.hadoop.mapred.TaskAttemptContext)construct.newInstance(conf, 
taskId, progressable);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      return newContext;
+    }
+
+    @Override
+    public JobContext createJobContext(Configuration conf,
+                                       JobID jobId) {
+      return new JobContext(conf, jobId);
+    }
+
+    @Override
+    public org.apache.hadoop.mapred.JobContext 
createJobContext(org.apache.hadoop.mapred.JobConf conf,
+                                   org.apache.hadoop.mapreduce.JobID jobId, 
Progressable progressable) {
+      org.apache.hadoop.mapred.JobContext newContext = null;
+      try {
+        java.lang.reflect.Constructor construct = 
org.apache.hadoop.mapred.JobContext.class.getDeclaredConstructor(
+                org.apache.hadoop.mapred.JobConf.class, 
org.apache.hadoop.mapreduce.JobID.class,
+                Progressable.class);
+        construct.setAccessible(true);
+        newContext = 
(org.apache.hadoop.mapred.JobContext)construct.newInstance(conf, jobId, 
progressable);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      return newContext;
+    }
+
+    @Override
+    public void commitJob(OutputFormat outputFormat, Job job) throws 
IOException {
+      if( job.getConfiguration().get("mapred.job.tracker", 
"").equalsIgnoreCase("local") ) {
+        try {
+          //In local mode, mapreduce will not call OutputCommitter.cleanupJob.
+          //Calling it from here so that the partition publish happens.
+          //This call needs to be removed after MAPREDUCE-1447 is fixed.
+          outputFormat.getOutputCommitter(createTaskAttemptContext(
+                  job.getConfiguration(), 
createTaskAttemptID())).commitJob(job);
+        } catch (IOException e) {
+          throw new IOException("Failed to cleanup job",e);
+        } catch (InterruptedException e) {
+          throw new IOException("Failed to cleanup job",e);
+        }
+      }
+    }
+
+    @Override
+    public void abortJob(OutputFormat outputFormat, Job job) throws 
IOException {
+      if (job.getConfiguration().get("mapred.job.tracker", "")
+              .equalsIgnoreCase("local")) {
+        try {
+          // This call needs to be removed after MAPREDUCE-1447 is fixed.
+          outputFormat.getOutputCommitter(createTaskAttemptContext(
+                  job.getConfiguration(), new TaskAttemptID())).abortJob(job, 
JobStatus.State.FAILED);
+        } catch (IOException e) {
+          throw new IOException("Failed to abort job", e);
+        } catch (InterruptedException e) {
+          throw new IOException("Failed to abort job", e);
+        }
+      }
+    }
+
+    @Override
+    public InetSocketAddress getResourceManagerAddress(Configuration conf)
+    {
+      return JobTracker.getAddress(conf);
+    }
+
+    @Override
+    public String getPropertyName(PropertyName name) {
+      switch (name) {
+        case CACHE_ARCHIVES:
+          return DistributedCache.CACHE_ARCHIVES;
+        case CACHE_FILES:
+          return DistributedCache.CACHE_FILES;
+        case CACHE_SYMLINK:
+          return DistributedCache.CACHE_SYMLINK;
+      }
+
+      return "";
+    }
+
+    @Override
+    public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException {
+      // In hadoop 1.x.x the file system URI is sufficient to determine the 
uri of the file
+      return "hdfs".equals(fs.getUri().getScheme());
+    }
+  }
+  @Override
+  public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation 
ugi) throws IOException {
+    return new WebHCatJTShim20S(conf, ugi);//this has state, so can't be cached
+  }
 }

Modified: 
hive/branches/vectorization/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: 
http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- 
hive/branches/vectorization/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
 (original)
+++ 
hive/branches/vectorization/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
 Thu Sep 12 01:21:10 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.shims;
 
 import java.io.IOException;
 import java.lang.Integer;
+import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.Map;
@@ -28,19 +29,27 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Trash;
-import org.apache.hadoop.hive.shims.HadoopShims.JobTrackerState;
-import org.apache.hadoop.hive.shims.HadoopShimsSecure;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.WebHCatJTShim23;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.mapreduce.util.HostUtil;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
+import org.apache.hadoop.security.UserGroupInformation;
 
 
 /**
@@ -230,4 +239,104 @@ public class Hadoop23Shims extends Hadoo
       cluster.shutdown();
     }
   }
+  private volatile HCatHadoopShims hcatShimInstance;
+  @Override
+  public HCatHadoopShims getHCatShim() {
+    if(hcatShimInstance == null) {
+      hcatShimInstance = new HCatHadoopShims23();
+    }
+    return hcatShimInstance;
+  }
+  private final class HCatHadoopShims23 implements HCatHadoopShims {
+    @Override
+    public TaskID createTaskID() {
+      return new TaskID("", 0, TaskType.MAP, 0);
+    }
+
+    @Override
+    public TaskAttemptID createTaskAttemptID() {
+      return new TaskAttemptID("", 0, TaskType.MAP, 0, 0);
+    }
+
+    @Override
+    public org.apache.hadoop.mapreduce.TaskAttemptContext 
createTaskAttemptContext(Configuration conf,
+                                                                               
    org.apache.hadoop.mapreduce.TaskAttemptID taskId) {
+      return new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(
+              conf instanceof JobConf? new JobConf(conf) : conf,
+              taskId);
+    }
+
+    @Override
+    public org.apache.hadoop.mapred.TaskAttemptContext 
createTaskAttemptContext(org.apache.hadoop.mapred.JobConf conf,
+                                                                               
 org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable) {
+      org.apache.hadoop.mapred.TaskAttemptContext newContext = null;
+      try {
+        java.lang.reflect.Constructor construct = 
org.apache.hadoop.mapred.TaskAttemptContextImpl.class.getDeclaredConstructor(
+                org.apache.hadoop.mapred.JobConf.class, 
org.apache.hadoop.mapred.TaskAttemptID.class,
+                Reporter.class);
+        construct.setAccessible(true);
+        newContext = (org.apache.hadoop.mapred.TaskAttemptContext) 
construct.newInstance(
+                new JobConf(conf), taskId, (Reporter) progressable);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      return newContext;
+    }
+
+    @Override
+    public JobContext createJobContext(Configuration conf,
+                                       JobID jobId) {
+      return new JobContextImpl(conf instanceof JobConf? new JobConf(conf) : 
conf,
+              jobId);
+    }
+
+    @Override
+    public org.apache.hadoop.mapred.JobContext 
createJobContext(org.apache.hadoop.mapred.JobConf conf,
+                                                                
org.apache.hadoop.mapreduce.JobID jobId, Progressable progressable) {
+      return new org.apache.hadoop.mapred.JobContextImpl(
+              new JobConf(conf), jobId, (org.apache.hadoop.mapred.Reporter) 
progressable);
+    }
+
+    @Override
+    public void commitJob(OutputFormat outputFormat, Job job) throws 
IOException {
+      // Do nothing as this was fixed by MAPREDUCE-1447.
+    }
+
+    @Override
+    public void abortJob(OutputFormat outputFormat, Job job) throws 
IOException {
+      // Do nothing as this was fixed by MAPREDUCE-1447.
+    }
+
+    @Override
+    public InetSocketAddress getResourceManagerAddress(Configuration conf) {
+      String addr = conf.get("yarn.resourcemanager.address", "localhost:8032");
+
+      return NetUtils.createSocketAddr(addr);
+    }
+
+    @Override
+    public String getPropertyName(PropertyName name) {
+      switch (name) {
+        case CACHE_ARCHIVES:
+          return MRJobConfig.CACHE_ARCHIVES;
+        case CACHE_FILES:
+          return MRJobConfig.CACHE_FILES;
+        case CACHE_SYMLINK:
+          return MRJobConfig.CACHE_SYMLINK;
+      }
+
+      return "";
+    }
+
+    @Override
+    public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException {
+      // In case of viewfs we need to lookup where the actual file is to know 
the filesystem in use.
+      // resolvePath is a sure shot way of knowing which file system the file 
is.
+      return "hdfs".equals(fs.resolvePath(path).toUri().getScheme());
+    }
+  }
+  @Override
+  public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation 
ugi) throws IOException {
+    return new WebHCatJTShim23(conf, ugi);//this has state, so can't be cached
+  }
 }

Modified: 
hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: 
http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- 
hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java
 (original)
+++ 
hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java
 Thu Sep 12 01:21:10 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.shims;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -40,13 +41,19 @@ import org.apache.hadoop.mapred.ClusterS
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobProfile;
+import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
 
@@ -461,4 +468,72 @@ public interface HadoopShims {
         Class<RecordReader<K, V>> rrClass) throws IOException;
   }
 
+  public HCatHadoopShims getHCatShim();
+  public interface HCatHadoopShims {
+
+    enum PropertyName {CACHE_ARCHIVES, CACHE_FILES, CACHE_SYMLINK}
+
+    public TaskID createTaskID();
+
+    public TaskAttemptID createTaskAttemptID();
+
+    public org.apache.hadoop.mapreduce.TaskAttemptContext 
createTaskAttemptContext(Configuration conf,
+                                                                               
    TaskAttemptID taskId);
+
+    public org.apache.hadoop.mapred.TaskAttemptContext 
createTaskAttemptContext(JobConf conf,
+                                                                               
 org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable);
+
+    public JobContext createJobContext(Configuration conf, JobID jobId);
+
+    public org.apache.hadoop.mapred.JobContext createJobContext(JobConf conf, 
JobID jobId, Progressable progressable);
+
+    public void commitJob(OutputFormat outputFormat, Job job) throws 
IOException;
+
+    public void abortJob(OutputFormat outputFormat, Job job) throws 
IOException;
+
+    /* Referring to job tracker in 0.20 and resource manager in 0.23 */
+    public InetSocketAddress getResourceManagerAddress(Configuration conf);
+
+    public String getPropertyName(PropertyName name);
+
+    /**
+     * Checks if file is in HDFS filesystem.
+     *
+     * @param fs
+     * @param path
+     * @return true if the file is in HDFS, false if the file is in other file 
systems.
+     */
+    public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException;
+  }
+  /**
+   * Provides a Hadoop JobTracker shim.
+   * @param conf not {@code null}
+   */
+  public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation 
ugi) throws IOException;
+  public interface WebHCatJTShim {
+    /**
+     * Grab a handle to a job that is already known to the JobTracker.
+     *
+     * @return Profile of the job, or null if not found.
+     */
+    public JobProfile getJobProfile(org.apache.hadoop.mapred.JobID jobid) 
throws IOException;
+    /**
+     * Grab a handle to a job that is already known to the JobTracker.
+     *
+     * @return Status of the job, or null if not found.
+     */
+    public JobStatus getJobStatus(org.apache.hadoop.mapred.JobID jobid) throws 
IOException;
+    /**
+     * Kill a job.
+     */
+    public void killJob(org.apache.hadoop.mapred.JobID jobid) throws 
IOException;
+    /**
+     * Get all the jobs submitted.
+     */
+    public JobStatus[] getAllJobs() throws IOException;
+    /**
+     * Close the connection to the Job Tracker.
+     */
+    public void close();
+  }
 }


Reply via email to