Author: hashutosh
Date: Tue Oct 1 02:52:09 2013
New Revision: 1527856
URL: http://svn.apache.org/r1527856
Log:
Merged in with trunk
Added:
hive/branches/vectorization/hcatalog/bin/templeton.cmd
- copied unchanged from r1527855, hive/trunk/hcatalog/bin/templeton.cmd
Modified:
hive/branches/vectorization/ (props changed)
hive/branches/vectorization/hcatalog/build-support/checkstyle/apache_header.txt
hive/branches/vectorization/hcatalog/build.xml
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/config/webhcat-default.xml
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ExecServiceImpl.java
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HcatDelegator.java
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSCleanup.java
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java
hive/branches/vectorization/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestServer.java
hive/branches/vectorization/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java
Propchange: hive/branches/vectorization/
------------------------------------------------------------------------------
Merged /hive/trunk:r1527793-1527855
Modified:
hive/branches/vectorization/hcatalog/build-support/checkstyle/apache_header.txt
URL:
http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/build-support/checkstyle/apache_header.txt?rev=1527856&r1=1527855&r2=1527856&view=diff
==============================================================================
---
hive/branches/vectorization/hcatalog/build-support/checkstyle/apache_header.txt
(original)
+++
hive/branches/vectorization/hcatalog/build-support/checkstyle/apache_header.txt
Tue Oct 1 02:52:09 2013
@@ -1,19 +1,19 @@
^#!
^<\?(xml|xml-stylesheet).*>$
^\W*$
-\W*Licensed to the Apache Software Foundation \(ASF\) under one$
-\W*or more contributor license agreements. See the NOTICE file$
-\W*distributed with this work for additional information$
-\W*regarding copyright ownership. The ASF licenses this file$
-\W*to you under the Apache License, Version 2.0 \(the$
-\W*"License"\); you may not use this file except in compliance$
-\W*with the License. You may obtain a copy of the License at$
-\W*$
-\W*http://www.apache.org/licenses/LICENSE-2.0$
-\W*$
-\W*Unless required by applicable law or agreed to in writing,$
-\W*software distributed under the License is distributed on an$
-\W*"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY$
-\W*KIND, either express or implied. See the License for the$
-\W*specific language governing permissions and limitations$
-\W*under the License.$
+.*? Licensed to the Apache Software Foundation \(ASF\) under one$
+.*? or more contributor license agreements. See the NOTICE file$
+.*? distributed with this work for additional information$
+.*? regarding copyright ownership. The ASF licenses this file$
+.*? to you under the Apache License, Version 2.0 \(the$
+.*? "License"\); you may not use this file except in compliance$
+.*? with the License. You may obtain a copy of the License at$
+.*?$
+.*? http://www.apache.org/licenses/LICENSE-2.0$
+.*?$
+.*? Unless required by applicable law or agreed to in writing,$
+.*? software distributed under the License is distributed on an$
+.*? "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY$
+.*? KIND, either express or implied. See the License for the$
+.*? specific language governing permissions and limitations$
+.*? under the License.$
Modified: hive/branches/vectorization/hcatalog/build.xml
URL:
http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/build.xml?rev=1527856&r1=1527855&r2=1527856&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/build.xml (original)
+++ hive/branches/vectorization/hcatalog/build.xml Tue Oct 1 02:52:09 2013
@@ -363,6 +363,7 @@
<include name="hcat"/>
<include name="hcat.py"/>
<include name="hcatcfg.py"/>
+ <include name="templeton.cmd"/>
</fileset>
</copy>
Modified:
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/config/webhcat-default.xml
URL:
http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/webhcat/svr/src/main/config/webhcat-default.xml?rev=1527856&r1=1527855&r2=1527856&view=diff
==============================================================================
---
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/config/webhcat-default.xml
(original)
+++
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/config/webhcat-default.xml
Tue Oct 1 02:52:09 2013
@@ -80,32 +80,38 @@
</property>
<property>
+ <name>templeton.python</name>
+ <value>${env.PYTHON_CMD}</value>
+ <description>The path to the python executable.</description>
+ </property>
+
+ <property>
<name>templeton.pig.archive</name>
- <value>hdfs:///apps/templeton/pig-0.10.1.tar.gz</value>
+ <value></value>
<description>The path to the Pig archive.</description>
</property>
<property>
<name>templeton.pig.path</name>
- <value>pig-0.10.1.tar.gz/pig-0.10.1/bin/pig</value>
+ <value>pig-0.11.1.tar.gz/pig-0.11.1/bin/pig</value>
<description>The path to the Pig executable.</description>
</property>
<property>
<name>templeton.hcat</name>
- <value>${env.HCAT_PREFIX}/bin/hcat</value>
+ <value>${env.HCAT_PREFIX}/bin/hcat.py</value>
<description>The path to the hcatalog executable.</description>
</property>
<property>
<name>templeton.hive.archive</name>
- <value>hdfs:///apps/templeton/hive-0.10.0.tar.gz</value>
+ <value></value>
<description>The path to the Hive archive.</description>
</property>
<property>
<name>templeton.hive.path</name>
- <value>hive-0.10.0.tar.gz/hive-0.10.0/bin/hive</value>
+ <value>hive-0.11.0.tar.gz/hive-0.11.0/bin/hive</value>
<description>The path to the Hive executable.</description>
</property>
Modified:
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
URL:
http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java?rev=1527856&r1=1527855&r2=1527856&view=diff
==============================================================================
---
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
(original)
+++
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
Tue Oct 1 02:52:09 2013
@@ -87,6 +87,7 @@ public class AppConfig extends Configura
public static final String HADOOP_NAME = "templeton.hadoop";
public static final String HADOOP_CONF_DIR = "templeton.hadoop.conf.dir";
public static final String HCAT_NAME = "templeton.hcat";
+ public static final String PYTHON_NAME = "templeton.python";
public static final String HIVE_ARCHIVE_NAME = "templeton.hive.archive";
public static final String HIVE_PATH_NAME = "templeton.hive.path";
public static final String HIVE_PROPS_NAME = "templeton.hive.properties";
@@ -181,6 +182,7 @@ public class AppConfig extends Configura
public String hadoopQueueName() { return get(HADOOP_QUEUE_NAME); }
public String clusterHadoop() { return get(HADOOP_NAME); }
public String clusterHcat() { return get(HCAT_NAME); }
+ public String clusterPython() { return get(PYTHON_NAME); }
public String pigPath() { return get(PIG_PATH_NAME); }
public String pigArchive() { return get(PIG_ARCHIVE_NAME); }
public String hivePath() { return get(HIVE_PATH_NAME); }
Modified:
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ExecServiceImpl.java
URL:
http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ExecServiceImpl.java?rev=1527856&r1=1527855&r2=1527856&view=diff
==============================================================================
---
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ExecServiceImpl.java
(original)
+++
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/ExecServiceImpl.java
Tue Oct 1 02:52:09 2013
@@ -18,12 +18,18 @@
*/
package org.apache.hive.hcatalog.templeton;
+import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.Semaphore;
import org.apache.commons.exec.CommandLine;
@@ -33,6 +39,38 @@ import org.apache.commons.exec.ExecuteWa
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.Shell;
+
+class StreamOutputWriter extends Thread
+{
+ InputStream is;
+ String type;
+ PrintWriter out;
+
+ StreamOutputWriter(InputStream is, String type, OutputStream outStream)
+ {
+ this.is = is;
+ this.type = type;
+ this.out = new PrintWriter(outStream, true);
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ BufferedReader br =
+ new BufferedReader(new InputStreamReader(is));
+ String line = null;
+ while ( (line = br.readLine()) != null){
+ out.println(line);
+ }
+ } catch (IOException ioe)
+ {
+ ioe.printStackTrace();
+ }
+ }
+}
/**
* Execute a local program. This is a singleton service that will
@@ -45,6 +83,9 @@ public class ExecServiceImpl implements
private static volatile ExecServiceImpl theSingleton;
+ /** Windows CreateProcess synchronization object */
+ private static final Object WindowsProcessLaunchLock = new Object();
+
/**
* Retrieve the singleton.
*/
@@ -133,7 +174,54 @@ public class ExecServiceImpl implements
LOG.info("Running: " + cmd);
ExecBean res = new ExecBean();
- res.exitcode = executor.execute(cmd, execEnv(env));
+
+ if(Shell.WINDOWS){
+ //The default executor is sometimes causing failure on windows. hcat
+ // command sometimes returns non zero exit status with it. It seems
+ // to hit some race conditions on windows.
+ env = execEnv(env);
+ String[] envVals = new String[env.size()];
+ int i=0;
+ for( Entry<String, String> kv : env.entrySet()){
+ envVals[i++] = kv.getKey() + "=" + kv.getValue();
+ LOG.info("Setting " + kv.getKey() + "=" + kv.getValue());
+ }
+
+ Process proc;
+ synchronized (WindowsProcessLaunchLock) {
+ // To workaround the race condition issue with child processes
+ // inheriting unintended handles during process launch that can
+ // lead to hangs on reading output and error streams, we
+ // serialize process creation. More info available at:
+ // http://support.microsoft.com/kb/315939
+ proc = Runtime.getRuntime().exec(cmd.toStrings(), envVals);
+ }
+
+ //consume stderr
+ StreamOutputWriter errorGobbler = new
+ StreamOutputWriter(proc.getErrorStream(), "ERROR", errStream);
+
+ //consume stdout
+ StreamOutputWriter outputGobbler = new
+ StreamOutputWriter(proc.getInputStream(), "OUTPUT", outStream);
+
+ //start collecting input streams
+ errorGobbler.start();
+ outputGobbler.start();
+ //execute
+ try{
+ res.exitcode = proc.waitFor();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ //flush
+ errorGobbler.out.flush();
+ outputGobbler.out.flush();
+ }
+ else {
+ res.exitcode = executor.execute(cmd, execEnv(env));
+ }
+
String enc = appConf.get(AppConfig.EXEC_ENCODING_NAME);
res.stdout = outStream.toString(enc);
res.stderr = errStream.toString(enc);
Modified:
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HcatDelegator.java
URL:
http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HcatDelegator.java?rev=1527856&r1=1527855&r2=1527856&view=diff
==============================================================================
---
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HcatDelegator.java
(original)
+++
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HcatDelegator.java
Tue Oct 1 02:52:09 2013
@@ -67,7 +67,11 @@ public class HcatDelegator extends Launc
Map<String, String> env = TempletonUtils.hadoopUserEnv(user, cp);
proxy.addEnv(env);
proxy.addArgs(args);
- return execService.run(appConf.clusterHcat(), args, env);
+ if (appConf.clusterHcat().toLowerCase().endsWith(".py")) {
+ return execService.run(appConf.clusterPython(), args, env);
+ } else {
+ return execService.run(appConf.clusterHcat(), args, env);
+ }
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
@@ -79,8 +83,12 @@ public class HcatDelegator extends Launc
private List<String> makeArgs(String exec, boolean format,
String group, String permissions) {
ArrayList<String> args = new ArrayList<String>();
+ if (appConf.clusterHcat().toLowerCase().endsWith(".py")) {
+ // hcat.py will become the first argument pass to command "python"
+ args.add(appConf.clusterHcat());
+ }
args.add("-e");
- args.add(exec);
+ args.add('"' + exec + '"');
if (TempletonUtils.isset(group)) {
args.add("-g");
args.add(group);
Modified:
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
URL:
http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java?rev=1527856&r1=1527855&r2=1527856&view=diff
==============================================================================
---
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
(original)
+++
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
Tue Oct 1 02:52:09 2013
@@ -64,6 +64,7 @@ public class HiveDelegator extends Launc
try {
args.addAll(makeBasicArgs(execute, srcFile, otherFiles, statusdir,
completedUrl, enablelog));
args.add("--");
+ TempletonUtils.addCmdForWindows(args);
args.add(appConf.hivePath());
args.add("--service");
@@ -75,16 +76,18 @@ public class HiveDelegator extends Launc
for (String prop : appConf.getStrings(AppConfig.HIVE_PROPS_NAME)) {
args.add("--hiveconf");
- args.add(prop);
+ args.add(TempletonUtils.quoteForWindows(prop));
}
for (String prop : defines) {
args.add("--hiveconf");
- args.add(prop);
+ args.add(TempletonUtils.quoteForWindows(prop));
+ }
+ for (String hiveArg : hiveArgs) {
+ args.add(TempletonUtils.quoteForWindows(hiveArg));
}
- args.addAll(hiveArgs);
if (TempletonUtils.isset(execute)) {
args.add("-e");
- args.add(execute);
+ args.add(TempletonUtils.quoteForWindows(execute));
} else if (TempletonUtils.isset(srcFile)) {
args.add("-f");
args.add(TempletonUtils.hadoopFsPath(srcFile, appConf, runAs)
@@ -120,8 +123,11 @@ public class HiveDelegator extends Launc
args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles,
enablelog, JobType.HIVE));
- args.add("-archives");
- args.add(appConf.hiveArchive());
+ if (appConf.hiveArchive() != null && !appConf.hiveArchive().equals(""))
+ {
+ args.add("-archives");
+ args.add(appConf.hiveArchive());
+ }
return args;
}
Modified:
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
URL:
http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java?rev=1527856&r1=1527855&r2=1527856&view=diff
==============================================================================
---
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
(original)
+++
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
Tue Oct 1 02:52:09 2013
@@ -68,28 +68,34 @@ public class JarDelegator extends Launch
args.addAll(makeLauncherArgs(appConf, statusdir,
completedUrl, allFiles, enablelog, jobType));
args.add("--");
+ TempletonUtils.addCmdForWindows(args);
args.add(appConf.clusterHadoop());
args.add("jar");
args.add(TempletonUtils.hadoopFsPath(jar, appConf, runAs).getName());
if (TempletonUtils.isset(mainClass))
args.add(mainClass);
if (TempletonUtils.isset(libjars)) {
+ String libjarsListAsString =
+ TempletonUtils.hadoopFsListAsString(libjars, appConf, runAs);
args.add("-libjars");
- args.add(TempletonUtils.hadoopFsListAsString(libjars, appConf,
- runAs));
+ args.add(TempletonUtils.quoteForWindows(libjarsListAsString));
}
if (TempletonUtils.isset(files)) {
+ String filesListAsString =
+ TempletonUtils.hadoopFsListAsString(files, appConf, runAs);
args.add("-files");
- args.add(TempletonUtils.hadoopFsListAsString(files, appConf,
- runAs));
+ args.add(TempletonUtils.quoteForWindows(filesListAsString));
}
//the token file location comes after mainClass, as a -Dprop=val
args.add("-D" + TempletonControllerJob.TOKEN_FILE_ARG_PLACEHOLDER);
- for (String d : defines)
- args.add("-D" + d);
-
- args.addAll(jarArgs);
+ for (String d : defines) {
+ args.add("-D");
+ TempletonUtils.quoteForWindows(d);
+ }
+ for (String arg : jarArgs) {
+ args.add(TempletonUtils.quoteForWindows(arg));
+ }
} catch (FileNotFoundException e) {
throw new BadParam(e.getMessage());
} catch (URISyntaxException e) {
Modified:
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
URL:
http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java?rev=1527856&r1=1527855&r2=1527856&view=diff
==============================================================================
---
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
(original)
+++
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
Tue Oct 1 02:52:09 2013
@@ -70,18 +70,24 @@ public class PigDelegator extends Launch
}
args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles,
enablelog, JobType.PIG));
- args.add("-archives");
- args.add(appConf.pigArchive());
+ if (appConf.pigArchive() != null && !appConf.pigArchive().equals(""))
+ {
+ args.add("-archives");
+ args.add(appConf.pigArchive());
+ }
args.add("--");
+ TempletonUtils.addCmdForWindows(args);
args.add(appConf.pigPath());
//the token file location should be first argument of pig
args.add("-D" + TempletonControllerJob.TOKEN_FILE_ARG_PLACEHOLDER);
- args.addAll(pigArgs);
+ for (String pigArg : pigArgs) {
+ args.add(TempletonUtils.quoteForWindows(pigArg));
+ }
if (TempletonUtils.isset(execute)) {
args.add("-execute");
- args.add(execute);
+ args.add(TempletonUtils.quoteForWindows(execute));
} else if (TempletonUtils.isset(srcFile)) {
args.add("-file");
args.add(TempletonUtils.hadoopFsPath(srcFile, appConf, runAs)
Modified:
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
URL:
http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java?rev=1527856&r1=1527855&r2=1527856&view=diff
==============================================================================
---
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
(original)
+++
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
Tue Oct 1 02:52:09 2013
@@ -587,7 +587,9 @@ public class Server {
@FormParam("output") String output,
@FormParam("mapper") String mapper,
@FormParam("reducer") String reducer,
- @FormParam("file") List<String> files,
+ @FormParam("combiner") String combiner,
+ @FormParam("file") List<String> fileList,
+ @FormParam("files") String files,
@FormParam("define") List<String> defines,
@FormParam("cmdenv") List<String> cmdenvs,
@FormParam("arg") List<String> args,
@@ -607,6 +609,8 @@ public class Server {
userArgs.put("output", output);
userArgs.put("mapper", mapper);
userArgs.put("reducer", reducer);
+ userArgs.put("combiner", combiner);
+ userArgs.put("file", fileList);
userArgs.put("files", files);
userArgs.put("define", defines);
userArgs.put("cmdenv", cmdenvs);
@@ -617,8 +621,8 @@ public class Server {
checkEnableLogPrerequisite(enablelog, statusdir);
StreamingDelegator d = new StreamingDelegator(appConf);
- return d.run(getDoAsUser(), userArgs, inputs, output, mapper, reducer,
- files, defines, cmdenvs, args,
+ return d.run(getDoAsUser(), userArgs, inputs, output, mapper, reducer,
combiner,
+ fileList, files, defines, cmdenvs, args,
statusdir, callback, getCompletedUrl(), enablelog, JobType.STREAMING);
}
Modified:
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
URL:
http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java?rev=1527856&r1=1527855&r2=1527856&view=diff
==============================================================================
---
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
(original)
+++
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
Tue Oct 1 02:52:09 2013
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.exec.ExecuteException;
+import org.apache.hive.hcatalog.templeton.tool.TempletonUtils;
/**
* Submit a streaming job to the MapReduce queue. Really just a front
@@ -38,8 +39,9 @@ public class StreamingDelegator extends
public EnqueueBean run(String user, Map<String, Object> userArgs,
List<String> inputs, String output,
- String mapper, String reducer,
- List<String> files, List<String> defines,
+ String mapper, String reducer, String combiner,
+ List<String> fileList,
+ String files, List<String> defines,
List<String> cmdenvs,
List<String> jarArgs,
String statusdir,
@@ -49,13 +51,13 @@ public class StreamingDelegator extends
JobType jobType)
throws NotAuthorizedException, BadParam, BusyException, QueueException,
ExecuteException, IOException, InterruptedException {
- List<String> args = makeArgs(inputs, output, mapper, reducer,
- files, defines, cmdenvs, jarArgs);
+ List<String> args = makeArgs(inputs, output, mapper, reducer, combiner,
+ fileList, cmdenvs, jarArgs);
JarDelegator d = new JarDelegator(appConf);
return d.run(user, userArgs,
appConf.streamingJar(), null,
- null, null, args, defines,
+ null, files, args, defines,
statusdir, callback, completedUrl, enableLog, jobType);
}
@@ -63,10 +65,12 @@ public class StreamingDelegator extends
String output,
String mapper,
String reducer,
- List<String> files,
- List<String> defines,
+ String combiner,
+ List<String> fileList,
List<String> cmdenvs,
- List<String> jarArgs) {
+ List<String> jarArgs)
+ throws BadParam
+ {
ArrayList<String> args = new ArrayList<String>();
for (String input : inputs) {
args.add("-input");
@@ -79,13 +83,24 @@ public class StreamingDelegator extends
args.add("-reducer");
args.add(reducer);
- for (String f : files)
- args.add("-file" + f);
- for (String d : defines)
- args.add("-D" + d);
- for (String e : cmdenvs)
- args.add("-cmdenv" + e);
- args.addAll(jarArgs);
+ if (TempletonUtils.isset(combiner)) {
+ args.add("-combiner");
+ args.add(combiner);
+ }
+
+ for (String f : fileList) {
+ args.add("-file");
+ args.add(f);
+ }
+
+ for (String e : cmdenvs) {
+ args.add("-cmdenv");
+ args.add(TempletonUtils.quoteForWindows(e));
+ }
+
+ for (String arg : jarArgs) {
+ args.add(TempletonUtils.quoteForWindows(arg));
+ }
return args;
}
Modified:
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSCleanup.java
URL:
http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSCleanup.java?rev=1527856&r1=1527855&r2=1527856&view=diff
==============================================================================
---
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSCleanup.java
(original)
+++
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSCleanup.java
Tue Oct 1 02:52:09 2013
@@ -98,7 +98,7 @@ public class HDFSCleanup extends Thread
// cycle fails, it'll try again on the next cycle.
try {
if (fs == null) {
- fs = FileSystem.get(appConf);
+ fs = new Path(storage_root).getFileSystem(appConf);
}
checkFiles(fs);
} catch (Exception e) {
Modified:
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java
URL:
http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java?rev=1527856&r1=1527855&r2=1527856&view=diff
==============================================================================
---
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java
(original)
+++
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java
Tue Oct 1 02:52:09 2013
@@ -210,7 +210,7 @@ public class HDFSStorage implements Temp
public void openStorage(Configuration config) throws IOException {
storage_root = config.get(TempletonStorage.STORAGE_ROOT);
if (fs == null) {
- fs = FileSystem.get(config);
+ fs = new Path(storage_root).getFileSystem(config);
}
}
Modified:
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
URL:
http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java?rev=1527856&r1=1527855&r2=1527856&view=diff
==============================================================================
---
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
(original)
+++
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
Tue Oct 1 02:52:09 2013
@@ -48,12 +48,15 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import
org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.hive.hcatalog.templeton.BadParam;
import org.apache.hive.hcatalog.templeton.LauncherDelegator;
/**
@@ -104,6 +107,9 @@ public class TempletonControllerJob exte
ArrayList<String> removeEnv = new ArrayList<String>();
removeEnv.add("HADOOP_ROOT_LOGGER");
+ removeEnv.add("hadoop-command");
+ removeEnv.add("CLASS");
+ removeEnv.add("mapredcommand");
Map<String, String> env = TempletonUtils.hadoopUserEnv(user,
overrideClasspath);
List<String> jarArgsList = new
LinkedList<String>(Arrays.asList(jarArgs));
@@ -112,7 +118,15 @@ public class TempletonControllerJob exte
if (tokenFile != null) {
//Token is available, so replace the placeholder
+ tokenFile = tokenFile.replaceAll("\"", "");
String tokenArg = "mapreduce.job.credentials.binary=" + tokenFile;
+ if (Shell.WINDOWS) {
+ try {
+ tokenArg = TempletonUtils.quoteForWindows(tokenArg);
+ } catch (BadParam e) {
+ throw new IOException("cannot pass " + tokenFile + " to
mapreduce.job.credentials.binary", e);
+ }
+ }
for(int i=0; i<jarArgsList.size(); i++){
String newArg =
jarArgsList.get(i).replace(TOKEN_FILE_ARG_PLACEHOLDER, tokenArg);
@@ -211,9 +225,9 @@ public class TempletonControllerJob exte
pool.execute(w);
}
- private KeepAlive startCounterKeepAlive(ExecutorService pool, Context cnt)
+ private KeepAlive startCounterKeepAlive(ExecutorService pool, Context
context)
throws IOException {
- KeepAlive k = new KeepAlive(cnt);
+ KeepAlive k = new KeepAlive(context);
pool.execute(k);
return k;
}
@@ -297,20 +311,25 @@ public class TempletonControllerJob exte
}
}
- private static class KeepAlive implements Runnable {
- private final Mapper.Context cnt;
- private volatile boolean sendReport;
+ public static class KeepAlive implements Runnable {
+ private Context context;
+ public boolean sendReport;
- public KeepAlive(Mapper.Context cnt) {
- this.cnt = cnt;
+ public KeepAlive(Context context)
+ {
this.sendReport = true;
+ this.context = context;
}
@Override
public void run() {
try {
while (sendReport) {
- cnt.progress();
+ // Periodically report progress on the Context object
+ // to prevent TaskTracker from killing the Templeton
+ // Controller task
+ context.progress();
+ System.err.println("KeepAlive Heart beat");
Thread.sleep(KEEP_ALIVE_MSEC);
}
} catch (InterruptedException e) {
Modified:
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
URL:
http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java?rev=1527856&r1=1527855&r2=1527856&view=diff
==============================================================================
---
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
(original)
+++
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
Tue Oct 1 02:52:09 2013
@@ -26,6 +26,7 @@ import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLConnection;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -39,8 +40,10 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.hcatalog.templeton.UgiFactory;
+import org.apache.hive.hcatalog.templeton.BadParam;
/**
* General utility methods.
@@ -296,4 +299,46 @@ public class TempletonUtils {
return env;
}
+
+ // Add double quotes around the given input parameter if it is not already
+ // quoted. Quotes are not allowed in the middle of the parameter, and
+ // BadParam exception is thrown if this is the case.
+ //
+ // This method should be used to escape parameters before they get passed to
+ // Windows cmd scripts (specifically, special characters like a comma or an
+ // equal sign might be lost as part of the cmd script processing if not
+ // under quotes).
+ public static String quoteForWindows(String param) throws BadParam {
+ if (Shell.WINDOWS) {
+ if (param != null && param.length() > 0) {
+ String nonQuotedPart = param;
+ boolean addQuotes = true;
+ if (param.charAt(0) == '\"' && param.charAt(param.length() - 1) ==
'\"') {
+ if (param.length() < 2)
+ throw new BadParam("Passed in parameter is incorrectly quoted: " +
param);
+
+ addQuotes = false;
+ nonQuotedPart = param.substring(1, param.length() - 1);
+ }
+
+ // If we have any quotes other then the outside quotes, throw
+ if (nonQuotedPart.contains("\"")) {
+ throw new BadParam("Passed in parameter is incorrectly quoted: " +
param);
+ }
+
+ if (addQuotes) {
+ param = '\"' + param + '\"';
+ }
+ }
+ }
+ return param;
+ }
+
+ public static void addCmdForWindows(ArrayList<String> args) {
+ if(Shell.WINDOWS){
+ args.add("cmd");
+ args.add("/c");
+ args.add("call");
+ }
+ }
}
Modified:
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java
URL:
http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java?rev=1527856&r1=1527855&r2=1527856&view=diff
==============================================================================
---
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java
(original)
+++
hive/branches/vectorization/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java
Tue Oct 1 02:52:09 2013
@@ -22,12 +22,16 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
/**
* Execute a local program. This is a singleton service that will
* execute a programs on the local box.
*/
public class TrivialExecService {
private static volatile TrivialExecService theSingleton;
+ private static final Log LOG = LogFactory.getLog(TrivialExecService.class);
/**
* Retrieve the singleton.
@@ -41,11 +45,7 @@ public class TrivialExecService {
public Process run(List<String> cmd, List<String> removeEnv,
Map<String, String> environmentVariables)
throws IOException {
- System.err.println("templeton: starting " + cmd);
- System.err.print("With environment variables: ");
- for (Map.Entry<String, String> keyVal : environmentVariables.entrySet()) {
- System.err.println(keyVal.getKey() + "=" + keyVal.getValue());
- }
+ logDebugCmd(cmd, environmentVariables);
ProcessBuilder pb = new ProcessBuilder(cmd);
for (String key : removeEnv)
pb.environment().remove(key);
@@ -53,4 +53,20 @@ public class TrivialExecService {
return pb.start();
}
+ private void logDebugCmd(List<String> cmd,
+ Map<String, String> environmentVariables) {
+ if(!LOG.isDebugEnabled()){
+ return;
+ }
+ LOG.debug("starting " + cmd);
+ LOG.debug("With environment variables: " );
+ for(Map.Entry<String, String> keyVal : environmentVariables.entrySet()){
+ LOG.debug(keyVal.getKey() + "=" + keyVal.getValue());
+ }
+ LOG.debug("With environment variables already set: " );
+ Map<String, String> env = System.getenv();
+ for (String envName : env.keySet()) {
+ LOG.debug(envName + "=" + env.get(envName));
+ }
+ }
}
Modified:
hive/branches/vectorization/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestServer.java
URL:
http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestServer.java?rev=1527856&r1=1527855&r2=1527856&view=diff
==============================================================================
---
hive/branches/vectorization/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestServer.java
(original)
+++
hive/branches/vectorization/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestServer.java
Tue Oct 1 02:52:09 2013
@@ -31,7 +31,7 @@ public class TestServer extends TestCase
MockServer server;
public void setUp() {
- new Main(null); // Initialize the config
+ new Main(new String[]{}); // Initialize the config
server = new MockServer();
}
Modified:
hive/branches/vectorization/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java
URL:
http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java?rev=1527856&r1=1527855&r2=1527856&view=diff
==============================================================================
---
hive/branches/vectorization/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java
(original)
+++
hive/branches/vectorization/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTempletonUtils.java
Tue Oct 1 02:52:09 2013
@@ -159,6 +159,12 @@ public class TestTempletonUtils {
@Test
public void testHadoopFsFilename() {
try {
+ String tmpFileName1 = "/tmp/testHadoopFsListAsArray1";
+ String tmpFileName2 = "/tmp/testHadoopFsListAsArray2";
+ File tmpFile1 = new File(tmpFileName1);
+ File tmpFile2 = new File(tmpFileName2);
+ tmpFile1.createNewFile();
+ tmpFile2.createNewFile();
Assert.assertEquals(null, TempletonUtils.hadoopFsFilename(null, null,
null));
Assert.assertEquals(null,
TempletonUtils.hadoopFsFilename(tmpFile.toURI().toString(), null,
null));
@@ -188,14 +194,22 @@ public class TestTempletonUtils {
@Test
public void testHadoopFsListAsArray() {
try {
+ String tmpFileName1 = "/tmp/testHadoopFsListAsArray1";
+ String tmpFileName2 = "/tmp/testHadoopFsListAsArray2";
+ File tmpFile1 = new File(tmpFileName1);
+ File tmpFile2 = new File(tmpFileName2);
+ tmpFile1.createNewFile();
+ tmpFile2.createNewFile();
Assert.assertTrue(TempletonUtils.hadoopFsListAsArray(null, null, null)
== null);
- Assert.assertTrue(TempletonUtils.hadoopFsListAsArray(
- tmpFile.toURI().toString() + "," + usrFile.toString(), null, null) ==
null);
- String[] tmp2 = TempletonUtils.hadoopFsListAsArray(
- tmpFile.toURI().toString() + "," + usrFile.toURI().toString(),
- new Configuration(), null);
- Assert.assertEquals(tmpFile.toURI().toString(), tmp2[0]);
- Assert.assertEquals(usrFile.toURI().toString(), tmp2[1]);
+ Assert.assertTrue(TempletonUtils.hadoopFsListAsArray(tmpFileName1 + ","
+ tmpFileName2,
+ null, null) == null);
+ String[] tmp2
+ = TempletonUtils.hadoopFsListAsArray(tmpFileName1 + "," + tmpFileName2,
+ new Configuration(), null);
+ Assert.assertEquals("file:" + tmpFileName1, tmp2[0]);
+ Assert.assertEquals("file:" + tmpFileName2, tmp2[1]);
+ tmpFile1.delete();
+ tmpFile2.delete();
} catch (FileNotFoundException e) {
Assert.fail("Couldn't find name for " + tmpFile.toURI().toString());
} catch (Exception e) {
@@ -218,15 +232,18 @@ public class TestTempletonUtils {
@Test
public void testHadoopFsListAsString() {
try {
+ String tmpFileName1 = "/tmp/testHadoopFsListAsString1";
+ String tmpFileName2 = "/tmp/testHadoopFsListAsString2";
+ File tmpFile1 = new File(tmpFileName1);
+ File tmpFile2 = new File(tmpFileName2);
+ tmpFile1.createNewFile();
+ tmpFile2.createNewFile();
Assert.assertTrue(TempletonUtils.hadoopFsListAsString(null, null, null)
== null);
- Assert.assertTrue(TempletonUtils.hadoopFsListAsString(
- tmpFile.toURI().toString() + "," + usrFile.toURI().toString(),
+ Assert.assertTrue(TempletonUtils.hadoopFsListAsString("/tmp,/usr",
null, null) == null);
- Assert.assertEquals(
- tmpFile.toURI().toString() + "," + usrFile.toURI().toString(),
- TempletonUtils.hadoopFsListAsString(
- tmpFile.toURI().toString() + "," + usrFile.toURI().toString(),
- new Configuration(), null));
+ Assert.assertEquals("file:" + tmpFileName1 + ",file:" + tmpFileName2,
+ TempletonUtils.hadoopFsListAsString
+ (tmpFileName1 + "," + tmpFileName2, new Configuration(), null));
} catch (FileNotFoundException e) {
Assert.fail("Couldn't find name for " + tmpFile.toURI().toString());
} catch (Exception e) {