Modified: hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/session/SessionManager.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1522098&r1=1522097&r2=1522098&view=diff ============================================================================== --- hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (original) +++ hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Thu Sep 12 01:21:10 2013 @@ -19,9 +19,18 @@ package org.apache.hive.service.cli.session; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.hooks.HookUtils; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hive.service.CompositeService; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.SessionHandle; @@ -32,11 +41,12 @@ import org.apache.hive.service.cli.opera * */ public class SessionManager extends CompositeService { - + private static final Log LOG = LogFactory.getLog(CompositeService.class); private HiveConf hiveConf; private final Map<SessionHandle, HiveSession> handleToSession = new HashMap<SessionHandle, HiveSession>(); private OperationManager operationManager = new OperationManager(); private static final Object sessionMapLock = new Object(); + private ExecutorService backgroundOperationPool; public SessionManager() { super("SessionManager"); @@ -45,10 +55,11 @@ public class SessionManager extends Comp @Override public synchronized void init(HiveConf hiveConf) { this.hiveConf = hiveConf; - operationManager = new OperationManager(); + int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS); + LOG.info("HiveServer2: Async execution pool size" + backgroundPoolSize); + backgroundOperationPool = Executors.newFixedThreadPool(backgroundPoolSize); addService(operationManager); - super.init(hiveConf); } @@ -62,6 +73,16 @@ public class SessionManager extends Comp public synchronized void stop() { // TODO super.stop(); + if (backgroundOperationPool != null) { + backgroundOperationPool.shutdown(); + long timeout = hiveConf.getLongVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT); + try { + backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS); + } catch (InterruptedException exc) { + LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout + + " seconds has been exceeded. RUNNING background operations will be shut down", exc); + } + } } @@ -72,16 +93,15 @@ public class SessionManager extends Comp public SessionHandle openSession(String username, String password, Map<String, String> sessionConf, boolean withImpersonation, String delegationToken) throws HiveSQLException { - HiveSession session; if (username == null) { username = threadLocalUserName.get(); } - + HiveSession session; if (withImpersonation) { - HiveSessionImplwithUGI hiveSessionUgi = new HiveSessionImplwithUGI(username, password, sessionConf, - delegationToken); - session = (HiveSession)HiveSessionProxy.getProxy(hiveSessionUgi, hiveSessionUgi.getSessionUgi()); - hiveSessionUgi.setProxySession(session); + HiveSessionImplwithUGI hiveSessionUgi = new HiveSessionImplwithUGI(username, password, + sessionConf, delegationToken); + session = HiveSessionProxy.getProxy(hiveSessionUgi, hiveSessionUgi.getSessionUgi()); + hiveSessionUgi.setProxySession(session); } else { session = new HiveSessionImpl(username, password, sessionConf); } @@ -90,6 +110,11 @@ public class SessionManager extends Comp synchronized(sessionMapLock) { handleToSession.put(session.getSessionHandle(), session); } + try { + executeSessionHooks(session); + } catch (Exception e) { + throw new HiveSQLException("Failed to execute session hooks", e); + } return session.getSessionHandle(); } @@ -150,4 +175,17 @@ public class SessionManager extends Comp threadLocalUserName.remove(); } + // execute session hooks + private void executeSessionHooks(HiveSession session) throws Exception { + List<HiveSessionHook> sessionHooks = HookUtils.getHooks(hiveConf, + HiveConf.ConfVars.HIVE_SERVER2_SESSION_HOOK, HiveSessionHook.class); + for (HiveSessionHook sessionHook : sessionHooks) { + sessionHook.run(new HiveSessionHookContextImpl(session)); + } + } + + public Future<?> submitBackgroundOperation(Runnable r) { + return backgroundOperationPool.submit(r); + } + }
Modified: hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1522098&r1=1522097&r2=1522098&view=diff ============================================================================== --- hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original) +++ hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Thu Sep 12 01:21:10 2013 @@ -200,8 +200,10 @@ public class ThriftCLIService extends Ab SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle()); String statement = req.getStatement(); Map<String, String> confOverlay = req.getConfOverlay(); - OperationHandle operationHandle = - cliService.executeStatement(sessionHandle, statement, confOverlay); + Boolean runAsync = req.isRunAsync(); + OperationHandle operationHandle = runAsync ? + cliService.executeStatementAsync(sessionHandle, statement, confOverlay) + : cliService.executeStatement(sessionHandle, statement, confOverlay); resp.setOperationHandle(operationHandle.toTOperationHandle()); resp.setStatus(OK_STATUS); } catch (Exception e) { Modified: hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java?rev=1522098&r1=1522097&r2=1522098&view=diff ============================================================================== --- hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java (original) +++ hive/branches/vectorization/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java Thu Sep 12 01:21:10 2013 @@ -122,9 +122,27 @@ public class ThriftCLIServiceClient exte public OperationHandle executeStatement(SessionHandle sessionHandle, String statement, Map<String, String> confOverlay) throws HiveSQLException { + return executeStatementInternal(sessionHandle, statement, confOverlay, false); + } + + /* (non-Javadoc) + * @see org.apache.hive.service.cli.ICLIService#executeStatementAsync(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.util.Map) + */ + @Override + public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, + Map<String, String> confOverlay) + throws HiveSQLException { + return executeStatementInternal(sessionHandle, statement, confOverlay, true); + } + + private OperationHandle executeStatementInternal(SessionHandle sessionHandle, String statement, + Map<String, String> confOverlay, boolean isAsync) + throws HiveSQLException { try { - TExecuteStatementReq req = new TExecuteStatementReq(sessionHandle.toTSessionHandle(), statement); + TExecuteStatementReq req = + new TExecuteStatementReq(sessionHandle.toTSessionHandle(), statement); req.setConfOverlay(confOverlay); + req.setRunAsync(isAsync); TExecuteStatementResp resp = cliService.ExecuteStatement(req); checkStatus(resp.getStatus()); return new OperationHandle(resp.getOperationHandle()); Modified: hive/branches/vectorization/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java?rev=1522098&r1=1522097&r2=1522098&view=diff ============================================================================== --- hive/branches/vectorization/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java (original) +++ hive/branches/vectorization/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java Thu Sep 12 01:21:10 2013 @@ -113,4 +113,105 @@ public abstract class CLIServiceTest { client.closeSession(sessionHandle); } + + @Test + public void testExecuteStatement() throws Exception { + HashMap<String, String> confOverlay = new HashMap<String, String>(); + SessionHandle sessionHandle = client.openSession("tom", "password", + new HashMap<String, String>()); + assertNotNull(sessionHandle); + + // Change lock manager, otherwise unit-test doesn't go through + String setLockMgr = "SET hive.lock.manager=" + + "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"; + client.executeStatement(sessionHandle, setLockMgr, confOverlay); + + String createTable = "CREATE TABLE TEST_EXEC(ID STRING)"; + client.executeStatement(sessionHandle, createTable, confOverlay); + + // blocking execute + String select = "SELECT ID FROM TEST_EXEC"; + OperationHandle ophandle = client.executeStatement(sessionHandle, select, confOverlay); + + // expect query to be completed now + assertEquals("Query should be finished", + OperationState.FINISHED, client.getOperationStatus(ophandle)); + } + + @Test + public void testExecuteStatementAsync() throws Exception { + HashMap<String, String> confOverlay = new HashMap<String, String>(); + SessionHandle sessionHandle = client.openSession("tom", "password", + new HashMap<String, String>()); + // Timeout for the poll in case of asynchronous execute + long pollTimeout = System.currentTimeMillis() + 100000; + assertNotNull(sessionHandle); + OperationState state = null; + OperationHandle ophandle; + + // Change lock manager, otherwise unit-test doesn't go through + String setLockMgr = "SET hive.lock.manager=" + + "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"; + client.executeStatement(sessionHandle, setLockMgr, confOverlay); + + String createTable = "CREATE TABLE TEST_EXEC_ASYNC(ID STRING)"; + client.executeStatementAsync(sessionHandle, createTable, confOverlay); + + // Test async execution response when query is malformed + String wrongQuery = "SELECT NAME FROM TEST_EXEC"; + ophandle = client.executeStatementAsync(sessionHandle, wrongQuery, confOverlay); + + int count = 0; + while (true) { + // Break if polling times out + if (System.currentTimeMillis() > pollTimeout) { + System.out.println("Polling timed out"); + break; + } + state = client.getOperationStatus(ophandle); + System.out.println("Polling: " + ophandle + " count=" + (++count) + + " state=" + state); + + if (OperationState.CANCELED == state || state == OperationState.CLOSED + || state == OperationState.FINISHED || state == OperationState.ERROR) { + break; + } + Thread.sleep(1000); + } + assertEquals("Query should return an error state", + OperationState.ERROR, client.getOperationStatus(ophandle)); + + // Test async execution when query is well formed + String select = "SELECT ID FROM TEST_EXEC_ASYNC"; + ophandle = + client.executeStatementAsync(sessionHandle, select, confOverlay); + + count = 0; + while (true) { + // Break if polling times out + if (System.currentTimeMillis() > pollTimeout) { + System.out.println("Polling timed out"); + break; + } + state = client.getOperationStatus(ophandle); + System.out.println("Polling: " + ophandle + " count=" + (++count) + + " state=" + state); + + if (OperationState.CANCELED == state || state == OperationState.CLOSED + || state == OperationState.FINISHED || state == OperationState.ERROR) { + break; + } + Thread.sleep(1000); + } + assertEquals("Query should be finished", + OperationState.FINISHED, client.getOperationStatus(ophandle)); + + // Cancellation test + ophandle = client.executeStatementAsync(sessionHandle, select, confOverlay); + System.out.println("cancelling " + ophandle); + client.cancelOperation(ophandle); + state = client.getOperationStatus(ophandle); + System.out.println(ophandle + " after cancelling, state= " + state); + assertEquals("Query should be cancelled", OperationState.CANCELED, state); + } } Modified: hive/branches/vectorization/shims/build.xml URL: http://svn.apache.org/viewvc/hive/branches/vectorization/shims/build.xml?rev=1522098&r1=1522097&r2=1522098&view=diff ============================================================================== --- hive/branches/vectorization/shims/build.xml (original) +++ hive/branches/vectorization/shims/build.xml Thu Sep 12 01:21:10 2013 @@ -100,8 +100,6 @@ to call at top-level: ant deploy-contrib destdir="${test.build.classes}" debug="${javac.debug}" optimize="${javac.optimize}" - target="${javac.version}" - source="${javac.version}" deprecation="${javac.deprecation}" includeantruntime="false"> <compilerarg line="${javac.args} ${javac.args.warnings}" /> Modified: hive/branches/vectorization/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1522098&r1=1522097&r2=1522098&view=diff ============================================================================== --- hive/branches/vectorization/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original) +++ hive/branches/vectorization/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Thu Sep 12 01:21:10 2013 @@ -735,5 +735,12 @@ public class Hadoop20Shims implements Ha throw new UnsupportedOperationException( "Kerberos not supported in current hadoop version"); } - + @Override + public HCatHadoopShims getHCatShim() { + throw new UnsupportedOperationException("HCatalog does not support Hadoop 0.20.x"); + } + @Override + public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation ugi) throws IOException { + throw new UnsupportedOperationException("WebHCat does not support Hadoop 0.20.x"); + } } Modified: hive/branches/vectorization/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1522098&r1=1522097&r2=1522098&view=diff ============================================================================== --- hive/branches/vectorization/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original) +++ hive/branches/vectorization/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Thu Sep 12 01:21:10 2013 @@ -18,23 +18,33 @@ package org.apache.hadoop.hive.shims; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URL; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Trash; -import org.apache.hadoop.hive.shims.HadoopShimsSecure; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapred.JobTracker; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TaskLogServlet; +import org.apache.hadoop.mapred.WebHCatJTShim20S; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.mapred.lib.TotalOrderPartitioner; +import org.apache.hadoop.security.UserGroupInformation; /** @@ -195,4 +205,129 @@ public class Hadoop20SShims extends Hado cluster.shutdown(); } } + private volatile HCatHadoopShims hcatShimInstance; + @Override + public HCatHadoopShims getHCatShim() { + if(hcatShimInstance == null) { + hcatShimInstance = new HCatHadoopShims20S(); + } + return hcatShimInstance; + } + private final class HCatHadoopShims20S implements HCatHadoopShims { + @Override + public TaskID createTaskID() { + return new TaskID(); + } + + @Override + public TaskAttemptID createTaskAttemptID() { + return new TaskAttemptID(); + } + + @Override + public TaskAttemptContext createTaskAttemptContext(Configuration conf, TaskAttemptID taskId) { + return new TaskAttemptContext(conf, taskId); + } + + @Override + public org.apache.hadoop.mapred.TaskAttemptContext createTaskAttemptContext(org.apache.hadoop.mapred.JobConf conf, + org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable) { + org.apache.hadoop.mapred.TaskAttemptContext newContext = null; + try { + java.lang.reflect.Constructor construct = org.apache.hadoop.mapred.TaskAttemptContext.class.getDeclaredConstructor( + org.apache.hadoop.mapred.JobConf.class, org.apache.hadoop.mapred.TaskAttemptID.class, + Progressable.class); + construct.setAccessible(true); + newContext = (org.apache.hadoop.mapred.TaskAttemptContext)construct.newInstance(conf, taskId, progressable); + } catch (Exception e) { + throw new RuntimeException(e); + } + return newContext; + } + + @Override + public JobContext createJobContext(Configuration conf, + JobID jobId) { + return new JobContext(conf, jobId); + } + + @Override + public org.apache.hadoop.mapred.JobContext createJobContext(org.apache.hadoop.mapred.JobConf conf, + org.apache.hadoop.mapreduce.JobID jobId, Progressable progressable) { + org.apache.hadoop.mapred.JobContext newContext = null; + try { + java.lang.reflect.Constructor construct = org.apache.hadoop.mapred.JobContext.class.getDeclaredConstructor( + org.apache.hadoop.mapred.JobConf.class, org.apache.hadoop.mapreduce.JobID.class, + Progressable.class); + construct.setAccessible(true); + newContext = (org.apache.hadoop.mapred.JobContext)construct.newInstance(conf, jobId, progressable); + } catch (Exception e) { + throw new RuntimeException(e); + } + return newContext; + } + + @Override + public void commitJob(OutputFormat outputFormat, Job job) throws IOException { + if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") ) { + try { + //In local mode, mapreduce will not call OutputCommitter.cleanupJob. + //Calling it from here so that the partition publish happens. + //This call needs to be removed after MAPREDUCE-1447 is fixed. + outputFormat.getOutputCommitter(createTaskAttemptContext( + job.getConfiguration(), createTaskAttemptID())).commitJob(job); + } catch (IOException e) { + throw new IOException("Failed to cleanup job",e); + } catch (InterruptedException e) { + throw new IOException("Failed to cleanup job",e); + } + } + } + + @Override + public void abortJob(OutputFormat outputFormat, Job job) throws IOException { + if (job.getConfiguration().get("mapred.job.tracker", "") + .equalsIgnoreCase("local")) { + try { + // This call needs to be removed after MAPREDUCE-1447 is fixed. + outputFormat.getOutputCommitter(createTaskAttemptContext( + job.getConfiguration(), new TaskAttemptID())).abortJob(job, JobStatus.State.FAILED); + } catch (IOException e) { + throw new IOException("Failed to abort job", e); + } catch (InterruptedException e) { + throw new IOException("Failed to abort job", e); + } + } + } + + @Override + public InetSocketAddress getResourceManagerAddress(Configuration conf) + { + return JobTracker.getAddress(conf); + } + + @Override + public String getPropertyName(PropertyName name) { + switch (name) { + case CACHE_ARCHIVES: + return DistributedCache.CACHE_ARCHIVES; + case CACHE_FILES: + return DistributedCache.CACHE_FILES; + case CACHE_SYMLINK: + return DistributedCache.CACHE_SYMLINK; + } + + return ""; + } + + @Override + public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException { + // In hadoop 1.x.x the file system URI is sufficient to determine the uri of the file + return "hdfs".equals(fs.getUri().getScheme()); + } + } + @Override + public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation ugi) throws IOException { + return new WebHCatJTShim20S(conf, ugi);//this has state, so can't be cached + } } Modified: hive/branches/vectorization/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1522098&r1=1522097&r2=1522098&view=diff ============================================================================== --- hive/branches/vectorization/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original) +++ hive/branches/vectorization/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Thu Sep 12 01:21:10 2013 @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.shims; import java.io.IOException; import java.lang.Integer; +import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URL; import java.util.Map; @@ -28,19 +29,27 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Trash; -import org.apache.hadoop.hive.shims.HadoopShims.JobTrackerState; -import org.apache.hadoop.hive.shims.HadoopShimsSecure; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.WebHCatJTShim23; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.util.HostUtil; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.mapred.lib.TotalOrderPartitioner; +import org.apache.hadoop.security.UserGroupInformation; /** @@ -230,4 +239,104 @@ public class Hadoop23Shims extends Hadoo cluster.shutdown(); } } + private volatile HCatHadoopShims hcatShimInstance; + @Override + public HCatHadoopShims getHCatShim() { + if(hcatShimInstance == null) { + hcatShimInstance = new HCatHadoopShims23(); + } + return hcatShimInstance; + } + private final class HCatHadoopShims23 implements HCatHadoopShims { + @Override + public TaskID createTaskID() { + return new TaskID("", 0, TaskType.MAP, 0); + } + + @Override + public TaskAttemptID createTaskAttemptID() { + return new TaskAttemptID("", 0, TaskType.MAP, 0, 0); + } + + @Override + public org.apache.hadoop.mapreduce.TaskAttemptContext createTaskAttemptContext(Configuration conf, + org.apache.hadoop.mapreduce.TaskAttemptID taskId) { + return new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl( + conf instanceof JobConf? new JobConf(conf) : conf, + taskId); + } + + @Override + public org.apache.hadoop.mapred.TaskAttemptContext createTaskAttemptContext(org.apache.hadoop.mapred.JobConf conf, + org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable) { + org.apache.hadoop.mapred.TaskAttemptContext newContext = null; + try { + java.lang.reflect.Constructor construct = org.apache.hadoop.mapred.TaskAttemptContextImpl.class.getDeclaredConstructor( + org.apache.hadoop.mapred.JobConf.class, org.apache.hadoop.mapred.TaskAttemptID.class, + Reporter.class); + construct.setAccessible(true); + newContext = (org.apache.hadoop.mapred.TaskAttemptContext) construct.newInstance( + new JobConf(conf), taskId, (Reporter) progressable); + } catch (Exception e) { + throw new RuntimeException(e); + } + return newContext; + } + + @Override + public JobContext createJobContext(Configuration conf, + JobID jobId) { + return new JobContextImpl(conf instanceof JobConf? new JobConf(conf) : conf, + jobId); + } + + @Override + public org.apache.hadoop.mapred.JobContext createJobContext(org.apache.hadoop.mapred.JobConf conf, + org.apache.hadoop.mapreduce.JobID jobId, Progressable progressable) { + return new org.apache.hadoop.mapred.JobContextImpl( + new JobConf(conf), jobId, (org.apache.hadoop.mapred.Reporter) progressable); + } + + @Override + public void commitJob(OutputFormat outputFormat, Job job) throws IOException { + // Do nothing as this was fixed by MAPREDUCE-1447. + } + + @Override + public void abortJob(OutputFormat outputFormat, Job job) throws IOException { + // Do nothing as this was fixed by MAPREDUCE-1447. + } + + @Override + public InetSocketAddress getResourceManagerAddress(Configuration conf) { + String addr = conf.get("yarn.resourcemanager.address", "localhost:8032"); + + return NetUtils.createSocketAddr(addr); + } + + @Override + public String getPropertyName(PropertyName name) { + switch (name) { + case CACHE_ARCHIVES: + return MRJobConfig.CACHE_ARCHIVES; + case CACHE_FILES: + return MRJobConfig.CACHE_FILES; + case CACHE_SYMLINK: + return MRJobConfig.CACHE_SYMLINK; + } + + return ""; + } + + @Override + public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException { + // In case of viewfs we need to lookup where the actual file is to know the filesystem in use. + // resolvePath is a sure shot way of knowing which file system the file is. + return "hdfs".equals(fs.resolvePath(path).toUri().getScheme()); + } + } + @Override + public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation ugi) throws IOException { + return new WebHCatJTShim23(conf, ugi);//this has state, so can't be cached + } } Modified: hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1522098&r1=1522097&r2=1522098&view=diff ============================================================================== --- hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java (original) +++ hive/branches/vectorization/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java Thu Sep 12 01:21:10 2013 @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.shims; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; @@ -40,13 +41,19 @@ import org.apache.hadoop.mapred.ClusterS import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobProfile; +import org.apache.hadoop.mapred.JobStatus; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; @@ -461,4 +468,72 @@ public interface HadoopShims { Class<RecordReader<K, V>> rrClass) throws IOException; } + public HCatHadoopShims getHCatShim(); + public interface HCatHadoopShims { + + enum PropertyName {CACHE_ARCHIVES, CACHE_FILES, CACHE_SYMLINK} + + public TaskID createTaskID(); + + public TaskAttemptID createTaskAttemptID(); + + public org.apache.hadoop.mapreduce.TaskAttemptContext createTaskAttemptContext(Configuration conf, + TaskAttemptID taskId); + + public org.apache.hadoop.mapred.TaskAttemptContext createTaskAttemptContext(JobConf conf, + org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable); + + public JobContext createJobContext(Configuration conf, JobID jobId); + + public org.apache.hadoop.mapred.JobContext createJobContext(JobConf conf, JobID jobId, Progressable progressable); + + public void commitJob(OutputFormat outputFormat, Job job) throws IOException; + + public void abortJob(OutputFormat outputFormat, Job job) throws IOException; + + /* Referring to job tracker in 0.20 and resource manager in 0.23 */ + public InetSocketAddress getResourceManagerAddress(Configuration conf); + + public String getPropertyName(PropertyName name); + + /** + * Checks if file is in HDFS filesystem. + * + * @param fs + * @param path + * @return true if the file is in HDFS, false if the file is in other file systems. + */ + public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException; + } + /** + * Provides a Hadoop JobTracker shim. + * @param conf not {@code null} + */ + public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation ugi) throws IOException; + public interface WebHCatJTShim { + /** + * Grab a handle to a job that is already known to the JobTracker. + * + * @return Profile of the job, or null if not found. + */ + public JobProfile getJobProfile(org.apache.hadoop.mapred.JobID jobid) throws IOException; + /** + * Grab a handle to a job that is already known to the JobTracker. + * + * @return Status of the job, or null if not found. + */ + public JobStatus getJobStatus(org.apache.hadoop.mapred.JobID jobid) throws IOException; + /** + * Kill a job. + */ + public void killJob(org.apache.hadoop.mapred.JobID jobid) throws IOException; + /** + * Get all the jobs submitted. + */ + public JobStatus[] getAllJobs() throws IOException; + /** + * Close the connection to the Job Tracker. + */ + public void close(); + } }
