olga
Thu, 20 Mar 2008 14:33:00 -0700
Author: olga Date: Thu Mar 20 14:32:22 2008 New Revision: 639469 URL: http://svn.apache.org/viewvc?rev=639469&view=rev Log: PIG-18: changes to make pig work with hadoop 0.16 and HOD-0.4 Added: incubator/pig/trunk/lib/hadoop16.jar (with props) Removed: incubator/pig/trunk/bin/startHOD.expect incubator/pig/trunk/lib/hadoop14.jar Modified: incubator/pig/trunk/CHANGES.txt incubator/pig/trunk/build.xml incubator/pig/trunk/lib-src/shock/org/apache/pig/shock/SSHSocketImplFactory.java incubator/pig/trunk/src/org/apache/pig/Main.java incubator/pig/trunk/src/org/apache/pig/backend/executionengine/ExecException.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Modified: incubator/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=639469&r1=639468&r2=639469&view=diff ============================================================================== --- incubator/pig/trunk/CHANGES.txt (original) +++ incubator/pig/trunk/CHANGES.txt Thu Mar 20 14:32:22 2008 @@ -5,6 +5,8 @@ INCOMPATIBLE CHANGES + PIG-123: requires escape of '\' in chars and string + NEW FEATURES OPTIMIZATIONS @@ -169,6 +171,8 @@ PIG-106: Change StringBuffer and String '+' to StringBuilder (francisoud via gates). + + PIG-18: changes to make pig work with Hadoop 0.16 and HOD 0.4 (olgan) PIG-164: Fix memory issue in SpillableMemoryManager to partially clean the list of bags each time a new bag is added rather than waiting until the garbage Modified: incubator/pig/trunk/build.xml URL: http://svn.apache.org/viewvc/incubator/pig/trunk/build.xml?rev=639469&r1=639468&r2=639469&view=diff ============================================================================== --- incubator/pig/trunk/build.xml (original) +++ incubator/pig/trunk/build.xml Thu Mar 20 14:32:22 2008 @@ -40,7 +40,7 @@ <property name="dist.dir" value="${build.dir}/${final.name}" /> <property name="build.encoding" value="ISO-8859-1" /> <!-- TODO with only one version of hadoop in the lib folder we do not need that anymore --> - <property name="hadoop.jarfile" value="hadoop15.jar" /> + <property name="hadoop.jarfile" value="hadoop16.jar" /> <!-- jar names. TODO we might want to use the svn reversion name in the name in case it is a dev version --> <property name="output.jarfile" value="${build.dir}/${final.name}.jar" /> Modified: incubator/pig/trunk/lib-src/shock/org/apache/pig/shock/SSHSocketImplFactory.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/lib-src/shock/org/apache/pig/shock/SSHSocketImplFactory.java?rev=639469&r1=639468&r2=639469&view=diff ============================================================================== --- incubator/pig/trunk/lib-src/shock/org/apache/pig/shock/SSHSocketImplFactory.java (original) +++ incubator/pig/trunk/lib-src/shock/org/apache/pig/shock/SSHSocketImplFactory.java Thu Mar 20 14:32:22 2008 @@ -30,6 +30,7 @@ import java.net.SocketAddress; import java.net.SocketException; import java.net.SocketImpl; +import java.net.SocketOptions; import java.net.SocketImplFactory; import java.net.UnknownHostException; import java.net.Proxy.Type; @@ -434,9 +435,12 @@ protected void sendUrgentData(int data) throws IOException { throw new IOException("SSHSocketImpl does not implement sendUrgentData"); } - + @Override public Object getOption(int optID) throws SocketException { - throw new SocketException("SSHSocketImpl does not implement getOption"); + if (optID == SocketOptions.SO_SNDBUF) + return new Integer(1024); + else + throw new SocketException("SSHSocketImpl does not implement getOption for " + optID); } /** Added: incubator/pig/trunk/lib/hadoop16.jar URL: http://svn.apache.org/viewvc/incubator/pig/trunk/lib/hadoop16.jar?rev=639469&view=auto ============================================================================== Binary file - no diff available. Propchange: incubator/pig/trunk/lib/hadoop16.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Modified: incubator/pig/trunk/src/org/apache/pig/Main.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/Main.java?rev=639469&r1=639468&r2=639469&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/Main.java (original) +++ incubator/pig/trunk/src/org/apache/pig/Main.java Thu Mar 20 14:32:22 2008 @@ -36,6 +36,7 @@ import org.apache.pig.impl.PigContext; import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder; import org.apache.pig.impl.util.JarManager; +import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.tools.cmdline.CmdLineParser; import org.apache.pig.tools.grunt.Grunt; import org.apache.pig.tools.timer.PerformanceTimerFactory; @@ -274,8 +275,12 @@ usage(); rc = 1; } catch (Throwable e) { - log.error(e); + //log.error(e); + // this is a hack to see full error till we resolve commons logging config + e.printStackTrace(); } finally { + // clear temp files + FileLocalizer.deleteTempFiles(); PerformanceTimerFactory.getPerfTimerFactory().dumpTimers(); System.exit(rc); } Modified: incubator/pig/trunk/src/org/apache/pig/backend/executionengine/ExecException.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/executionengine/ExecException.java?rev=639469&r1=639468&r2=639469&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/executionengine/ExecException.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/executionengine/ExecException.java Thu Mar 20 14:32:22 2008 @@ -8,14 +8,14 @@ } public ExecException() { - this(null, null); + super(); } public ExecException(String message) { - this(message, null); + super(message); } public ExecException(Throwable cause) { - this(null, cause); + super(cause); } } Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=639469&r1=639468&r2=639469&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu Mar 20 14:32:22 2008 @@ -1,7 +1,12 @@ package org.apache.pig.backend.hadoop.executionengine; +import java.io.File; import java.io.IOException; -import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.FileOutputStream; +import java.io.BufferedReader; +import java.io.BufferedWriter; import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; @@ -43,6 +48,7 @@ public class HExecutionEngine implements ExecutionEngine { private final Log log = LogFactory.getLog(getClass()); + private static final String LOCAL = "local"; protected PigContext pigContext; @@ -120,14 +126,14 @@ } else { if (cluster != null && cluster.length() > 0) { - if(cluster.indexOf(':') < 0 && !cluster.equalsIgnoreCase("local")) { + if(cluster.indexOf(':') < 0 && !cluster.equalsIgnoreCase(LOCAL)) { cluster = cluster + ":50020"; } setJobtrackerLocation(cluster); } if (nameNode!=null && nameNode.length() > 0) { - if(nameNode.indexOf(':') < 0 && !nameNode.equalsIgnoreCase("local")) { + if(nameNode.indexOf(':') < 0 && !nameNode.equalsIgnoreCase(LOCAL)) { nameNode = nameNode + ":8020"; } setFilesystemLocation(nameNode); @@ -143,7 +149,7 @@ throw new ExecException("Failed to create DataStorage", e); } - if(cluster != null && !cluster.equalsIgnoreCase("local")){ + if(cluster != null && !cluster.equalsIgnoreCase(LOCAL)){ log.info("Connecting to map-reduce job tracker at: " + conf.get("mapred.job.tracker")); try { @@ -166,7 +172,7 @@ } public void close() throws ExecException { - ; + closeHod(System.getProperty("hod.server")); } public Properties getConfiguration() throws ExecException { @@ -315,10 +321,15 @@ //To prevent doing hod if the pig server is constructed multiple times private static String hodMapRed; private static String hodHDFS; - - private enum ParsingState { - NOTHING, HDFSUI, MAPREDUI, HDFS, MAPRED, HADOOPCONF - }; + private String hodConfDir = null; + private String remoteHodConfDir = null; + private Process hodProcess = null; + + class ShutdownThread extends Thread{ + public synchronized void run() { + closeHod(System.getProperty("hod.server")); + } + } private String[] doHod(String server) throws ExecException { if (hodMapRed != null) { @@ -326,125 +337,69 @@ } try { - Process p = null; - // Make the kryptonite released version the default if nothing - // else is specified. - StringBuilder cmd = new StringBuilder(); - cmd.append(System.getProperty("hod.expect.root")); - cmd.append('/'); - cmd.append("libexec/pig/"); - cmd.append(System.getProperty("hod.expect.uselatest")); - cmd.append('/'); - cmd.append(System.getProperty("hod.command")); - - String cluster = System.getProperty("yinst.cluster"); - - // TODO This is a Yahoo specific holdover, need to remove - // this. - if (cluster != null && cluster.length() > 0 && !cluster.startsWith("kryptonite")) { - cmd.append(" --config="); - cmd.append(System.getProperty("hod.config.dir")); - cmd.append('/'); - cmd.append(cluster); - } + // first, create temp director to store the configuration + hodConfDir = createTempDir(server); + + // get the number of nodes out of the command or use default + StringBuilder hodParams = new StringBuilder(System.getProperty("hod.param", "")); + int nodes = getNumNodes(hodParams); + + // command format: hod allocate - d <cluster_dir> -n <number_of_nodes> <other params> + String[] cmdarray = new String[7]; + cmdarray[0] = "hod"; + cmdarray[1] = "allocate"; + cmdarray[2] = "-d"; + cmdarray[3] = hodConfDir; + cmdarray[4] = "-n"; + cmdarray[5] = Integer.toString(nodes); + cmdarray[6] = hodParams.toString(); - cmd.append(" " + System.getProperty("hod.param", "")); + log.info("Connecting to HOD..."); + log.debug("sending HOD command " + cmdToString(cmdarray)); - if (server.equals("local")) { - p = Runtime.getRuntime().exec(cmd.toString()); - } - else { - SSHSocketImplFactory fac = SSHSocketImplFactory.getFactory(server); - p = fac.ssh(cmd.toString()); - } - - InputStream is = p.getInputStream(); + // setup shutdown hook to make sure we tear down hod connection + Runtime.getRuntime().addShutdownHook(new ShutdownThread()); - log.info("Connecting to HOD..."); - log.debug("sending HOD command " + cmd.toString()); + hodProcess = runCommand(server, cmdarray); + + // print all the information provided by HOD + try { + BufferedReader br = new BufferedReader(new InputStreamReader(hodProcess.getErrorStream())); + String msg; + while ((msg = br.readLine()) != null) + log.info(msg); + br.close(); + } catch(IOException ioe) {} + + // for remote connection we need to bring the file locally + if (!server.equals(LOCAL)) + hodConfDir = copyHadoopConfLocally(server); - StringBuilder sb = new StringBuilder(); - int c; - String hdfsUI = null; - String mapredUI = null; String hdfs = null; String mapred = null; - String hadoopConf = null; + String hadoopConf = hodConfDir + "/hadoop-site.xml"; - ParsingState current = ParsingState.NOTHING; + log.info ("Hadoop configuration file: " + hadoopConf); + + JobConf jobConf = new JobConf(hadoopConf); + jobConf.addResource("pig-cluster-hadoop-site.xml"); + conf = new HConfiguration(jobConf); + + hdfs = (String)conf.get("fs.default.name"); + if (hdfs == null) + throw new ExecException("Missing fs.default.name from hadoop configuration"); + log.info("HDFS: " + hdfs); + + mapred = (String)conf.get("mapred.job.tracker"); + if (mapred == null) + throw new ExecException("Missing mapred.job.tracker from hadoop configuration"); + log.info("JobTracker: " + mapred); - while((c = is.read()) != -1 && mapred == null) { - if (c == '\n' || c == '\r') { - switch(current) { - case HDFSUI: - hdfsUI = sb.toString().trim(); - log.info("HDFS Web UI: " + hdfsUI); - break; - case HDFS: - hdfs = sb.toString().trim(); - log.info("HDFS: " + hdfs); - break; - case MAPREDUI: - mapredUI = sb.toString().trim(); - log.info("JobTracker Web UI: " + mapredUI); - break; - case MAPRED: - mapred = sb.toString().trim(); - log.info("JobTracker: " + mapred); - break; - case HADOOPCONF: - hadoopConf = sb.toString().trim(); - log.info("HadoopConf: " + hadoopConf); - break; - } - current = ParsingState.NOTHING; - sb = new StringBuilder(); - } - sb.append((char)c); - if (sb.indexOf("hdfsUI:") != -1) { - current = ParsingState.HDFSUI; - sb = new StringBuilder(); - } - else if (sb.indexOf("hdfs:") != -1) { - current = ParsingState.HDFS; - sb = new StringBuilder(); - } - else if (sb.indexOf("mapredUI:") != -1) { - current = ParsingState.MAPREDUI; - sb = new StringBuilder(); - } - else if (sb.indexOf("mapred:") != -1) { - current = ParsingState.MAPRED; - sb = new StringBuilder(); - } - else if (sb.indexOf("hadoopConf:") != -1) { - current = ParsingState.HADOOPCONF; - sb = new StringBuilder(); - } - } - - hdfsUI = fixUpDomain(hdfsUI); hdfs = fixUpDomain(hdfs); - mapredUI = fixUpDomain(mapredUI); mapred = fixUpDomain(mapred); hodHDFS = hdfs; hodMapRed = mapred; - if (hadoopConf != null) { - JobConf jobConf = new JobConf(hadoopConf); - jobConf.addResource("pig-cluster-hadoop-site.xml"); - - conf = new HConfiguration(jobConf); - - // make sure that files on class path are used - System.out.println("Job Conf = " + conf); - System.out.println("dfs.block.size= " + conf.get("dfs.block.size")); - System.out.println("ipc.client.timeout= " + conf.get("ipc.client.timeout")); - System.out.println("mapred.child.java.opts= " + conf.get("mapred.child.java.opts")); - } - else { - throw new IOException("Missing Hadoop configuration file"); - } return new String[] {hdfs, mapred}; } catch (Exception e) { @@ -454,15 +409,240 @@ } } + private synchronized void closeHod(String server){ + if (hodProcess == null) + return; + + // hod deallocate format: hod deallocate -d <conf dir> + String[] cmdarray = new String[4]; + cmdarray[0] = "hod"; + cmdarray[1] = "deallocate"; + cmdarray[2] = "-d"; + if (remoteHodConfDir != null) + cmdarray[3] = remoteHodConfDir; + else + cmdarray[3] = hodConfDir; + + log.info("Disconnecting from HOD..."); + log.debug("Disconnect command: " + cmdToString(cmdarray)); + + try { + Process p = runCommand(server, cmdarray); + } catch (Exception e) { + log.warn("Failed to disconnect from HOD; error: " + e.getMessage()); + } finally { + if (remoteHodConfDir != null) + deleteDir(server, remoteHodConfDir); + deleteDir(LOCAL, hodConfDir); + } + + hodProcess = null; + } + + private String copyHadoopConfLocally(String server) throws ExecException { + String localDir = createTempDir(LOCAL); + String remoteFile = new String(hodConfDir + "/hadoop-site.xml"); + String localFile = new String(localDir + "/hadoop-site.xml"); + + remoteHodConfDir = hodConfDir; + + String[] cmdarray = new String[2]; + cmdarray[0] = "cat"; + cmdarray[1] = remoteFile; + + Process p = runCommand(server, cmdarray); + + BufferedWriter bw; + try { + bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(localFile))); + } catch (Exception e){ + throw new ExecException("Failed to create local hadoop file " + localFile, e); + } + + try { + BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream())); + String line; + while ((line = br.readLine()) != null){ + bw.write(line, 0, line.length()); + bw.newLine(); + } + br.close(); + bw.close(); + } catch (Exception e){ + throw new ExecException("Failed to copy data to local hadoop file " + localFile, e); + } + + return localDir; + } + + private String cmdToString(String[] cmdarray) { + StringBuilder cmd = new StringBuilder(); + + for (int i = 0; i < cmdarray.length; i++) { + cmd.append(cmdarray[i]); + cmd.append(' '); + } + + return cmd.toString(); + } + private Process runCommand(String server, String[] cmdarray) throws ExecException { + Process p; + try { + if (server.equals(LOCAL)) { + p = Runtime.getRuntime().exec(cmdarray); + } + else { + SSHSocketImplFactory fac = SSHSocketImplFactory.getFactory(server); + p = fac.ssh(cmdToString(cmdarray)); + } + + //this should return as soon as connection is shutdown + int rc = p.waitFor(); + if (rc != 0) { + String errMsg = new String(); + try { + BufferedReader br = new BufferedReader(new InputStreamReader(p.getErrorStream())); + errMsg = br.readLine(); + br.close(); + } catch (IOException ioe) {} + StringBuilder msg = new StringBuilder("Failed to run command "); + msg.append(cmdToString(cmdarray)); + msg.append(" on server "); + msg.append(server); + msg.append("; return code: "); + msg.append(rc); + msg.append("; error: "); + msg.append(errMsg); + throw new ExecException(msg.toString()); + } + } catch (Exception e){ + throw new ExecException(e); + } + + return p; + } + + private void deleteDir(String server, String dir) { + if (server.equals(LOCAL)){ + File path = new File(dir); + deleteLocalDir(path); + } + else { + // send rm command over ssh + String[] cmdarray = new String[3]; + cmdarray[0] = "rm"; + cmdarray[1] = "-rf"; + cmdarray[2] = dir; + + try{ + Process p = runCommand(server, cmdarray); + }catch(Exception e){ + log.warn("Failed to remove HOD configuration directory - " + dir); + } + } + } + + private void deleteLocalDir(File path){ + File[] files = path.listFiles(); + int i; + for (i = 0; i < files.length; i++){ + if (files[i].isHidden()) + continue; + if (files[i].isFile()) + files[i].delete(); + else if (files[i].isDirectory()) + deleteLocalDir(files[i]); + } + + path.delete(); + } + private String fixUpDomain(String hostPort) throws UnknownHostException { String parts[] = hostPort.split(":"); if (parts[0].indexOf('.') == -1) { - parts[0] = parts[0] + ".inktomisearch.com"; + String domain = System.getProperty("cluster.domain"); + if (domain == null) + throw new RuntimeException("Missing cluster.domain property!"); + parts[0] = parts[0] + "." + domain; } InetAddress.getByName(parts[0]); return parts[0] + ":" + parts[1]; } - + + // create temp dir to store hod output; removed on exit + // format: <tempdir>/PigHod.<host name>.<user name>.<nanosecondts> + private String createTempDir(String server) throws ExecException { + StringBuilder tempDirPrefix = new StringBuilder (); + + if (server.equals(LOCAL)) + tempDirPrefix.append(System.getProperty("java.io.tmpdir")); + else + // for remote access we assume /tmp as temp dir + tempDirPrefix.append("/tmp"); + + tempDirPrefix.append("/PigHod."); + try { + tempDirPrefix.append(InetAddress.getLocalHost().getHostName()); + tempDirPrefix.append("."); + } catch (UnknownHostException e) {} + + tempDirPrefix.append(System.getProperty("user.name")); + tempDirPrefix.append("."); + String path; + do { + path = tempDirPrefix.toString() + System.nanoTime(); + } while (!createDir(server, path)); + + return path; + } + + private boolean createDir(String server, String dir) throws ExecException{ + if (server.equals(LOCAL)){ + // create local directory + File tempDir = new File(dir); + boolean success = tempDir.mkdir(); + if (!success) + log.warn("Failed to create HOD configuration directory - " + dir + ". Retrying ..."); + + return success; + } + else { + String[] cmdarray = new String[2]; + cmdarray[0] = "mkdir "; + cmdarray[1] = dir; + + try{ + Process p = runCommand(server, cmdarray); + } + catch(ExecException e){ + log.warn("Failed to create HOD configuration directory - " + dir + "Retrying..."); + return false; + } + + return true; + } + } + + // returns number of nodes based on -m option in hodParams if present; + // otherwise, default is used; -m is removed from the params + int getNumNodes(StringBuilder hodParams) { + String val = hodParams.toString(); + int startPos = val.indexOf("-m "); + if (startPos == -1) + startPos = val.indexOf("-m\t"); + if (startPos != -1) { + int curPos = startPos + 3; + int len = val.length(); + while (curPos < len && Character.isWhitespace(val.charAt(curPos))) curPos ++; + int numStartPos = curPos; + while (curPos < len && Character.isDigit(val.charAt(curPos))) curPos ++; + int nodes = Integer.parseInt(val.substring(numStartPos, curPos)); + hodParams.delete(startPos, curPos); + return nodes; + } else { + return Integer.getInteger("hod.nodes", 15); + } + } } Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=639469&r1=639468&r2=639469&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Thu Mar 20 14:32:22 2008 @@ -251,20 +251,24 @@ initialized = true; relativeRoot = pigContext.getDfs().asContainer("/tmp/temp" + r.nextInt()); toDelete.push(relativeRoot); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - while (!toDelete.empty()) { - try { - ElementDescriptor elem = toDelete.pop(); - elem.delete(); - } - catch (IOException e) { - log.error(e); - } - } - } - }); + // Runtime.getRuntime().addShutdownHook(new Thread() { + // @Override + // public void run() { + // deleteTempFiles(); + // } + //}); + } + } + + public static void deleteTempFiles() { + while (!toDelete.empty()) { + try { + ElementDescriptor elem = toDelete.pop(); + elem.delete(); + } + catch (IOException e) { + log.error(e); + } } }