Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java?rev=1557100&r1=1557099&r2=1557100&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java Fri Jan 10 11:59:46 2014 @@ -53,8 +53,8 @@ import org.apache.hama.pipes.protocol.St * */ public class PipesApplication<K1, V1, K2, V2, M extends Writable> { - private static final Log LOG = LogFactory.getLog(PipesApplication.class); + private static final int SERVER_SOCKET_TIMEOUT = 2000; private ServerSocket serverSocket; private Process process; private Socket clientSocket; @@ -85,11 +85,9 @@ public class PipesApplication<K1, V1, K2 // add TMPDIR environment variable with the value of java.io.tmpdir env.put("TMPDIR", System.getProperty("java.io.tmpdir")); - /* Set Logging Environment from Configuration */ + // Set Logging Environment from Configuration env.put("hama.pipes.logging", conf.getBoolean("hama.pipes.logging", false) ? "1" : "0"); - LOG.debug("DEBUG hama.pipes.logging: " - + conf.getBoolean("hama.pipes.logging", false)); return env; } @@ -213,7 +211,7 @@ public class PipesApplication<K1, V1, K2 if (!streamingEnabled) { LOG.debug("DEBUG: waiting for Client at " + serverSocket.getLocalSocketAddress()); - serverSocket.setSoTimeout(2000); + serverSocket.setSoTimeout(SERVER_SOCKET_TIMEOUT); clientSocket = serverSocket.accept(); LOG.debug("DEBUG: Client connected! - start BinaryProtocol!"); @@ -234,7 +232,7 @@ public class PipesApplication<K1, V1, K2 br.close(); throw new SocketException( - "Timout: Client pipes application was not connecting!"); + "Timout: Client pipes application did not connect!"); } } @@ -284,7 +282,7 @@ public class PipesApplication<K1, V1, K2 } else { LOG.debug("DEBUG: waiting for Client at " + serverSocket.getLocalSocketAddress()); - serverSocket.setSoTimeout(2000); + serverSocket.setSoTimeout(SERVER_SOCKET_TIMEOUT); clientSocket = serverSocket.accept(); LOG.debug("DEBUG: Client connected! - start BinaryProtocol!"); @@ -305,7 +303,7 @@ public class PipesApplication<K1, V1, K2 br.close(); throw new SocketException( - "Timout: Client pipes application was not connecting!"); + "Timout: Client pipes application did not connect!"); } }
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java?rev=1557100&r1=1557099&r2=1557100&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java Fri Jan 10 11:59:46 2014 @@ -19,8 +19,6 @@ package org.apache.hama.pipes; import java.io.IOException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; import org.apache.hama.bsp.BSP; @@ -34,24 +32,27 @@ import org.apache.hama.bsp.sync.SyncExce public class PipesBSP<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable, M extends Writable> extends BSP<K1, V1, K2, V2, BytesWritable> { - private static final Log LOG = LogFactory.getLog(PipesBSP.class); private PipesApplication<K1, V1, K2, V2, BytesWritable> application = new PipesApplication<K1, V1, K2, V2, BytesWritable>(); + private boolean applicationIsAlive = true; @Override public void setup(BSPPeer<K1, V1, K2, V2, BytesWritable> peer) throws IOException, SyncException, InterruptedException { - this.application.start(peer); + try { + this.application.start(peer); - this.application.getDownlink().runSetup(false, false); + this.application.getDownlink().runSetup(); - try { this.application.waitForFinish(); + } catch (IOException e) { - LOG.error(e); + this.application.cleanup(false); throw e; + } catch (InterruptedException e) { - e.printStackTrace(); + this.application.cleanup(false); + throw e; } } @@ -59,15 +60,20 @@ public class PipesBSP<K1 extends Writabl public void bsp(BSPPeer<K1, V1, K2, V2, BytesWritable> peer) throws IOException, SyncException, InterruptedException { - this.application.getDownlink().runBsp(false, false); - try { + this.application.getDownlink().runBsp(); + this.application.waitForFinish(); + } catch (IOException e) { - LOG.error(e); + applicationIsAlive = false; + this.application.cleanup(false); throw e; + } catch (InterruptedException e) { - e.printStackTrace(); + applicationIsAlive = false; + this.application.cleanup(false); + throw e; } } @@ -83,17 +89,24 @@ public class PipesBSP<K1 extends Writabl public void cleanup(BSPPeer<K1, V1, K2, V2, BytesWritable> peer) throws IOException { - application.getDownlink().runCleanup(false, false); - try { - this.application.waitForFinish(); + if (applicationIsAlive) { + + this.application.getDownlink().runCleanup(); + + this.application.waitForFinish(); + + this.application.cleanup(true); + } } catch (IOException e) { - LOG.error(e); + applicationIsAlive = false; + this.application.cleanup(false); throw e; + } catch (InterruptedException e) { - e.printStackTrace(); - } finally { - this.application.cleanup(true); + applicationIsAlive = false; + this.application.cleanup(false); + throw new IOException(e); } } Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java?rev=1557100&r1=1557099&r2=1557100&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java Fri Jan 10 11:59:46 2014 @@ -109,46 +109,6 @@ public class Submitter implements Tool { } /** - * Set whether the job is using a Java RecordReader. - * - * @param conf the configuration to modify - * @param value the new value - */ - public static void setIsJavaRecordReader(Configuration conf, boolean value) { - conf.setBoolean("hama.pipes.java.recordreader", value); - } - - /** - * Check whether the job is using a Java RecordReader - * - * @param conf the configuration to check - * @return is it a Java RecordReader? - */ - public static boolean getIsJavaRecordReader(Configuration conf) { - return conf.getBoolean("hama.pipes.java.recordreader", false); - } - - /** - * Set whether the job will use a Java RecordWriter. - * - * @param conf the configuration to modify - * @param value the new value to set - */ - public static void setIsJavaRecordWriter(Configuration conf, boolean value) { - conf.setBoolean("hama.pipes.java.recordwriter", value); - } - - /** - * Will the job use a Java RecordWriter? - * - * @param conf the configuration to check - * @return true, if the output of the job will be written by Java - */ - public static boolean getIsJavaRecordWriter(Configuration conf) { - return conf.getBoolean("hama.pipes.java.recordwriter", false); - } - - /** * Set the configuration, if it doesn't already have a value for the given * key. * @@ -237,8 +197,6 @@ public class Submitter implements Tool { setIfUnset(job.getConfiguration(), "bsp.job.name", "Hama Pipes Job"); // DEBUG Output - LOG.debug("isJavaRecordReader: " - + getIsJavaRecordReader(job.getConfiguration())); LOG.debug("BspClass: " + job.getBspClass().getName()); // conf.setInputFormat(NLineInputFormat.class); LOG.debug("InputFormat: " + job.getInputFormat()); @@ -440,7 +398,6 @@ public class Submitter implements Tool { } if (results.hasOption("inputformat")) { - setIsJavaRecordReader(job.getConfiguration(), true); job.setInputFormat(getClass(results, "inputformat", conf, InputFormat.class)); } @@ -451,7 +408,6 @@ public class Submitter implements Tool { } if (results.hasOption("outputformat")) { - setIsJavaRecordWriter(job.getConfiguration(), true); job.setOutputFormat(getClass(results, "outputformat", conf, OutputFormat.class)); } Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java?rev=1557100&r1=1557099&r2=1557100&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java Fri Jan 10 11:59:46 2014 @@ -37,6 +37,7 @@ import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.util.StringUtils; import org.apache.hama.bsp.BSPPeer; import org.apache.hama.pipes.Submitter; @@ -61,6 +62,7 @@ public class BinaryProtocol<K1, V1, K2, public final Object hasTaskLock = new Object(); private boolean hasTask = false; + private Throwable uplinkException = null; public final Object resultLock = new Object(); private Integer resultInt = null; @@ -128,6 +130,10 @@ public class BinaryProtocol<K1, V1, K2, return new UplinkReader<K1, V1, K2, V2, M>(this, peer, in); } + public void setUplinkException(Throwable e) { + this.uplinkException = e; + } + public boolean isHasTask() { return this.hasTask; } @@ -223,36 +229,24 @@ public class BinaryProtocol<K1, V1, K2, } @Override - public void runSetup(boolean pipedInput, boolean pipedOutput) - throws IOException { - + public void runSetup() throws IOException { WritableUtils.writeVInt(this.outStream, MessageType.RUN_SETUP.code); - WritableUtils.writeVInt(this.outStream, pipedInput ? 1 : 0); - WritableUtils.writeVInt(this.outStream, pipedOutput ? 1 : 0); flush(); setHasTask(true); LOG.debug("Sent MessageType.RUN_SETUP"); } @Override - public void runBsp(boolean pipedInput, boolean pipedOutput) - throws IOException { - + public void runBsp() throws IOException { WritableUtils.writeVInt(this.outStream, MessageType.RUN_BSP.code); - WritableUtils.writeVInt(this.outStream, pipedInput ? 1 : 0); - WritableUtils.writeVInt(this.outStream, pipedOutput ? 1 : 0); flush(); setHasTask(true); LOG.debug("Sent MessageType.RUN_BSP"); } @Override - public void runCleanup(boolean pipedInput, boolean pipedOutput) - throws IOException { - + public void runCleanup() throws IOException { WritableUtils.writeVInt(this.outStream, MessageType.RUN_CLEANUP.code); - WritableUtils.writeVInt(this.outStream, pipedInput ? 1 : 0); - WritableUtils.writeVInt(this.outStream, pipedOutput ? 1 : 0); flush(); setHasTask(true); LOG.debug("Sent MessageType.RUN_CLEANUP"); @@ -279,7 +273,7 @@ public class BinaryProtocol<K1, V1, K2, synchronized (this.resultLock) { try { while (resultInt == null) { - this.resultLock.wait(); + this.resultLock.wait(); // this call blocks } resultVal = resultInt; @@ -329,17 +323,19 @@ public class BinaryProtocol<K1, V1, K2, @Override public boolean waitForFinish() throws IOException, InterruptedException { - // LOG.debug("waitForFinish... "+hasTask); + // LOG.debug("waitForFinish... " + hasTask); synchronized (this.hasTaskLock) { - try { - while (this.hasTask) - this.hasTaskLock.wait(); - } catch (InterruptedException e) { - LOG.error(e); + while (this.hasTask) { + this.hasTaskLock.wait(); // this call blocks } - } + // Check if UplinkReader thread has thrown exception + if (uplinkException != null) { + throw new InterruptedException( + StringUtils.stringifyException(uplinkException)); + } + } return hasTask; } Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java?rev=1557100&r1=1557099&r2=1557100&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java Fri Jan 10 11:59:46 2014 @@ -58,29 +58,23 @@ public interface DownwardProtocol<K1, V1 /** * runSetup * - * @param pipedInput use pipedInput - * @param pipedOutput use pipedOutput * @throws IOException */ - void runSetup(boolean pipedInput, boolean pipedOutput) throws IOException; + void runSetup() throws IOException; /** * runBsp * - * @param pipedInput use pipedInput - * @param pipedOutput use pipedOutput * @throws IOException */ - void runBsp(boolean pipedInput, boolean pipedOutput) throws IOException; + void runBsp() throws IOException; /** * runCleanup * - * @param pipedInput use pipedInput - * @param pipedOutput use pipedOutput * @throws IOException */ - void runCleanup(boolean pipedInput, boolean pipedOutput) throws IOException; + void runCleanup() throws IOException; /** * getPartition Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java?rev=1557100&r1=1557099&r2=1557100&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java Fri Jan 10 11:59:46 2014 @@ -244,15 +244,13 @@ public class StreamingProtocol<K1 extend } @Override - public void runSetup(boolean pipedInput, boolean pipedOutput) - throws IOException { + public void runSetup() throws IOException { writeLine(MessageType.RUN_SETUP, null); waitOnAck(); } @Override - public void runBsp(boolean pipedInput, boolean pipedOutput) - throws IOException { + public void runBsp() throws IOException { writeLine(MessageType.RUN_BSP, null); waitOnAck(); } @@ -269,8 +267,7 @@ public class StreamingProtocol<K1 extend } @Override - public void runCleanup(boolean pipedInput, boolean pipedOutput) - throws IOException { + public void runCleanup() throws IOException { writeLine(MessageType.RUN_CLEANUP, null); waitOnAck(); } Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java?rev=1557100&r1=1557099&r2=1557100&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java Fri Jan 10 11:59:46 2014 @@ -24,8 +24,10 @@ import java.io.IOException; import java.io.InputStream; import java.util.AbstractMap; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,7 +37,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -55,12 +56,14 @@ public class UplinkReader<KEYIN, VALUEIN private BinaryProtocol<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> binProtocol; private BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> peer = null; private Configuration conf; + private FileSystem fs; protected DataInputStream inStream; protected DataOutputStream outStream; private Map<Integer, Entry<SequenceFile.Reader, Entry<Writable, Writable>>> sequenceFileReaders; private Map<Integer, Entry<SequenceFile.Writer, Entry<Writable, Writable>>> sequenceFileWriters; + private Set<String> sequenceFileWriterPaths; public UplinkReader( BinaryProtocol<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> binaryProtocol, @@ -68,6 +71,7 @@ public class UplinkReader<KEYIN, VALUEIN this.binProtocol = binaryProtocol; this.conf = conf; + this.fs = FileSystem.get(conf); this.inStream = new DataInputStream(new BufferedInputStream(stream, BinaryProtocol.BUFFER_SIZE)); @@ -76,6 +80,7 @@ public class UplinkReader<KEYIN, VALUEIN this.sequenceFileReaders = new HashMap<Integer, Entry<SequenceFile.Reader, Entry<Writable, Writable>>>(); this.sequenceFileWriters = new HashMap<Integer, Entry<SequenceFile.Writer, Entry<Writable, Writable>>>(); + this.sequenceFileWriterPaths = new HashSet<String>(); } public UplinkReader( @@ -99,10 +104,9 @@ public class UplinkReader<KEYIN, VALUEIN } int cmd = readCommand(); - if (cmd == -1) { - continue; - } - LOG.debug("Handling uplink command: " + MessageType.values()[cmd]); + LOG.debug("Handling uplink command: " + cmd); + // MessageType.values()[cmd] may cause NullPointerException (bad + // command) if (cmd == MessageType.WRITE_KEYVALUE.code && isPeerAvailable()) { // INCOMING writeKeyValue(); @@ -165,10 +169,11 @@ public class UplinkReader<KEYIN, VALUEIN } else if (cmd == MessageType.PARTITION_RESPONSE.code) { // INCOMING partitionResponse(); } else { - throw new IOException("Bad command code: " + cmd); + throw new Exception("Bad command code: " + cmd); } } catch (InterruptedException e) { + onError(e); return; } catch (Throwable e) { onError(e); @@ -180,6 +185,13 @@ public class UplinkReader<KEYIN, VALUEIN // onError is overwritten by StreamingProtocol in Hama Streaming protected void onError(Throwable e) { LOG.error(StringUtils.stringifyException(e)); + + // notify binaryProtocol and set Exception + synchronized (binProtocol.hasTaskLock) { + binProtocol.setUplinkException(e); + binProtocol.setHasTask(false); + binProtocol.hasTaskLock.notify(); + } } // readCommand is overwritten by StreamingProtocol in Hama Streaming @@ -188,7 +200,20 @@ public class UplinkReader<KEYIN, VALUEIN } public void closeConnection() throws IOException { + // close input stream this.inStream.close(); + + // close open SequenceFileReaders + for (int fileID : this.sequenceFileReaders.keySet()) { + LOG.debug("close SequenceFileReader: " + fileID); + this.sequenceFileReaders.get(fileID).getKey().close(); + } + + // close open SequenceFileWriters + for (int fileID : this.sequenceFileWriters.keySet()) { + LOG.debug("close SequenceFileWriter: " + fileID); + this.sequenceFileWriters.get(fileID).getKey().close(); + } } public void reopenInput() throws IOException { @@ -261,14 +286,15 @@ public class UplinkReader<KEYIN, VALUEIN public void getAllPeerNames() throws IOException { LOG.debug("Got MessageType.GET_ALL_PEERNAME"); + String[] peerNames = peer.getAllPeerNames(); WritableUtils.writeVInt(this.outStream, MessageType.GET_ALL_PEERNAME.code); - WritableUtils.writeVInt(this.outStream, peer.getAllPeerNames().length); - for (String s : peer.getAllPeerNames()) { + WritableUtils.writeVInt(this.outStream, peerNames.length); + for (String s : peerNames) { Text.writeString(this.outStream, s); } binProtocol.flush(); LOG.debug("Responded MessageType.GET_ALL_PEERNAME - peerNamesCount: " - + peer.getAllPeerNames().length); + + peerNames.length); } public void sync() throws IOException, SyncException, InterruptedException { @@ -283,15 +309,18 @@ public class UplinkReader<KEYIN, VALUEIN public void getMessage() throws IOException { LOG.debug("Got MessageType.GET_MSG"); - WritableUtils.writeVInt(this.outStream, MessageType.GET_MSG.code); Writable message = peer.getCurrentMessage(); if (message != null) { + WritableUtils.writeVInt(this.outStream, MessageType.GET_MSG.code); binProtocol.writeObject(message); + LOG.debug("Responded MessageType.GET_MSG - Message: " + + ((message.toString().length() < 10) ? message.toString() : message + .toString().substring(0, 9) + "...")); + } else { + WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code); + LOG.debug("Responded MessageType.END_OF_DATA"); } binProtocol.flush(); - LOG.debug("Responded MessageType.GET_MSG - Message: " - + ((message.toString().length() < 10) ? message.toString() : message - .toString().substring(0, 9) + "...")); } public void getMessageCount() throws IOException { @@ -408,7 +437,7 @@ public class UplinkReader<KEYIN, VALUEIN WritableUtils.writeVInt(this.outStream, MessageType.WRITE_KEYVALUE.code); binProtocol.flush(); LOG.debug("Responded MessageType.WRITE_KEYVALUE"); - + LOG.debug("Done MessageType.WRITE_KEYVALUE -" + " Key: " + ((keyOut.toString().length() < 10) ? keyOut.toString() : keyOut @@ -432,31 +461,43 @@ public class UplinkReader<KEYIN, VALUEIN int fileID = -1; - FileSystem fs = FileSystem.get(conf); if (option.equals("r")) { SequenceFile.Reader reader; try { reader = new SequenceFile.Reader(fs, new Path(path), conf); - // try to load key and value class - Class<?> sequenceKeyClass = conf.getClassLoader().loadClass(keyClass); - Class<?> sequenceValueClass = conf.getClassLoader().loadClass( - valueClass); - - // try to instantiate key and value class - Writable sequenceKeyWritable = (Writable) ReflectionUtils.newInstance( - sequenceKeyClass, conf); - Writable sequenceValueWritable = (Writable) ReflectionUtils - .newInstance(sequenceValueClass, conf); - - // put new fileID and key and value Writable instances into HashMap - fileID = reader.hashCode(); - sequenceFileReaders - .put( - fileID, - new AbstractMap.SimpleEntry<SequenceFile.Reader, Entry<Writable, Writable>>( - reader, new AbstractMap.SimpleEntry<Writable, Writable>( - sequenceKeyWritable, sequenceValueWritable))); + if (reader.getKeyClassName().equals(keyClass) + && reader.getValueClassName().equals(valueClass)) { + // try to load key and value class + Class<?> sequenceKeyClass = conf.getClassLoader().loadClass(keyClass); + Class<?> sequenceValueClass = conf.getClassLoader().loadClass( + valueClass); + + // try to instantiate key and value class + Writable sequenceKeyWritable = (Writable) ReflectionUtils + .newInstance(sequenceKeyClass, conf); + Writable sequenceValueWritable = (Writable) ReflectionUtils + .newInstance(sequenceValueClass, conf); + + // put new fileID and key and value Writable instances into HashMap + fileID = reader.hashCode(); + this.sequenceFileReaders + .put( + fileID, + new AbstractMap.SimpleEntry<SequenceFile.Reader, Entry<Writable, Writable>>( + reader, new AbstractMap.SimpleEntry<Writable, Writable>( + sequenceKeyWritable, sequenceValueWritable))); + + } else { // keyClass or valueClass is wrong + fileID = -1; + if (!reader.getKeyClassName().equals(keyClass)) { + LOG.error("SEQFILE_OPEN - Wrong KeyClass: " + keyClass + + " File KeyClass: " + reader.getKeyClassName()); + } else { + LOG.error("SEQFILE_OPEN - Wrong ValueClass: " + valueClass + + " File ValueClass: " + reader.getValueClassName()); + } + } } catch (IOException e) { LOG.error("SEQFILE_OPEN - " + e.getMessage()); @@ -469,29 +510,42 @@ public class UplinkReader<KEYIN, VALUEIN } else if (option.equals("w")) { SequenceFile.Writer writer; try { - - // try to load key and value class - Class<?> sequenceKeyClass = conf.getClassLoader().loadClass(keyClass); - Class<?> sequenceValueClass = conf.getClassLoader().loadClass( - valueClass); - - writer = new SequenceFile.Writer(fs, conf, new Path(path), - sequenceKeyClass, sequenceValueClass); - - // try to instantiate key and value class - Writable sequenceKeyWritable = (Writable) ReflectionUtils.newInstance( - sequenceKeyClass, conf); - Writable sequenceValueWritable = (Writable) ReflectionUtils - .newInstance(sequenceValueClass, conf); - - // put new fileID and key and value Writable instances into HashMap - fileID = writer.hashCode(); - sequenceFileWriters - .put( - fileID, - new AbstractMap.SimpleEntry<SequenceFile.Writer, Entry<Writable, Writable>>( - writer, new AbstractMap.SimpleEntry<Writable, Writable>( - sequenceKeyWritable, sequenceValueWritable))); + // SequenceFile.Writer has an exclusive lease for a file + // No other client can write to this file until other Writer has + // completed + if (!this.sequenceFileWriterPaths.contains(path)) { + + // try to load key and value class + Class<?> sequenceKeyClass = conf.getClassLoader().loadClass(keyClass); + Class<?> sequenceValueClass = conf.getClassLoader().loadClass( + valueClass); + + // try to instantiate key and value class + Writable sequenceKeyWritable = (Writable) ReflectionUtils + .newInstance(sequenceKeyClass, conf); + Writable sequenceValueWritable = (Writable) ReflectionUtils + .newInstance(sequenceValueClass, conf); + + writer = new SequenceFile.Writer(fs, conf, new Path(path), + sequenceKeyClass, sequenceValueClass); + + // put new fileID and key and value Writable instances into HashMap + fileID = writer.hashCode(); + this.sequenceFileWriters + .put( + fileID, + new AbstractMap.SimpleEntry<SequenceFile.Writer, Entry<Writable, Writable>>( + writer, new AbstractMap.SimpleEntry<Writable, Writable>( + sequenceKeyWritable, sequenceValueWritable))); + + // add path to set (exclusive access) + this.sequenceFileWriterPaths.add(path); + + } else { // Path was already opened by another SequenceFile.Writer + fileID = -1; + LOG.error("SEQFILE_OPEN - Path: " + path + + " is already used by another Writer!"); + } } catch (IOException e) { LOG.error("SEQFILE_OPEN - " + e.getMessage()); @@ -515,7 +569,7 @@ public class UplinkReader<KEYIN, VALUEIN LOG.debug("GOT MessageType.SEQFILE_READNEXT - FileID: " + fileID); // check if fileID is available in sequenceFileReader - if (sequenceFileReaders.containsKey(fileID)) { + if (this.sequenceFileReaders.containsKey(fileID)) { Writable sequenceKeyWritable = sequenceFileReaders.get(fileID).getValue() .getKey(); @@ -523,7 +577,7 @@ public class UplinkReader<KEYIN, VALUEIN .getValue().getValue(); // try to read next key/value pair from SequenceFile.Reader - if (sequenceFileReaders.get(fileID).getKey() + if (this.sequenceFileReaders.get(fileID).getKey() .next(sequenceKeyWritable, sequenceValueWritable)) { WritableUtils.writeVInt(this.outStream, @@ -542,14 +596,14 @@ public class UplinkReader<KEYIN, VALUEIN + "...")); } else { // false when at end of file - WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code); LOG.debug("Responded MessageType.SEQFILE_READNEXT - END_OF_DATA"); } binProtocol.flush(); } else { // no fileID stored - LOG.warn("SequenceFileReader: FileID " + fileID + " not found!"); + LOG.error("MessageType.SEQFILE_READNEXT: FileID " + fileID + + " not found!"); WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code); LOG.debug("Responded MessageType.SEQFILE_READNEXT - END_OF_DATA"); binProtocol.flush(); @@ -563,7 +617,7 @@ public class UplinkReader<KEYIN, VALUEIN boolean result = false; // check if fileID is available in sequenceFileWriter - if (sequenceFileWriters.containsKey(fileID)) { + if (this.sequenceFileWriters.containsKey(fileID)) { Writable sequenceKeyWritable = sequenceFileWriters.get(fileID).getValue() .getKey(); @@ -577,7 +631,7 @@ public class UplinkReader<KEYIN, VALUEIN if ((sequenceKeyWritable != null) && (sequenceValueWritable != null)) { // append to sequenceFile - sequenceFileWriters.get(fileID).getKey() + this.sequenceFileWriters.get(fileID).getKey() .append(sequenceKeyWritable, sequenceValueWritable); LOG.debug("Stored data: Key: " @@ -591,6 +645,13 @@ public class UplinkReader<KEYIN, VALUEIN result = true; } + } else { // no fileID stored + + // Skip written data from InputStream + int availableBytes = this.inStream.available(); + this.inStream.skip(availableBytes); + LOG.debug("MessageType.SEQFILE_APPEND: skip " + availableBytes + " bytes"); + LOG.error("MessageType.SEQFILE_APPEND: FileID " + fileID + " not found!"); } // RESPOND @@ -606,12 +667,16 @@ public class UplinkReader<KEYIN, VALUEIN boolean result = false; - if (sequenceFileReaders.containsKey(fileID)) { - sequenceFileReaders.get(fileID).getKey().close(); + if (this.sequenceFileReaders.containsKey(fileID)) { + this.sequenceFileReaders.get(fileID).getKey().close(); + this.sequenceFileReaders.remove(fileID); result = true; - } else if (sequenceFileWriters.containsKey(fileID)) { - sequenceFileWriters.get(fileID).getKey().close(); + } else if (this.sequenceFileWriters.containsKey(fileID)) { + this.sequenceFileWriters.get(fileID).getKey().close(); + this.sequenceFileWriters.remove(fileID); result = true; + } else { // no fileID stored + LOG.error("MessageType.SEQFILE_CLOSE: FileID " + fileID + " not found!"); } // RESPOND @@ -663,9 +728,6 @@ public class UplinkReader<KEYIN, VALUEIN } else if (obj instanceof LongWritable) { ((LongWritable) obj).set(WritableUtils.readVLong(this.inStream)); - } else if (obj instanceof NullWritable) { - throw new IOException("Cannot read data into NullWritable!"); - } else { try { LOG.debug("reading type: " + obj.getClass().getName()); Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java?rev=1557100&r1=1557099&r2=1557100&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java Fri Jan 10 11:59:46 2014 @@ -107,6 +107,10 @@ public class SequenceFileDumper { Path path = new Path(cmdLine.getOptionValue("file")); FileSystem fs = FileSystem.get(path.toUri(), conf); + if (!fs.isFile(path)) { + System.out.println("File does not exist: " + path.toString()); + return; + } SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); Writer writer; Modified: hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java?rev=1557100&r1=1557099&r2=1557100&view=diff ============================================================================== --- hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java (original) +++ hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java Fri Jan 10 11:59:46 2014 @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hama.Constants; @@ -44,6 +45,7 @@ import org.apache.hama.bsp.ClusterStatus import org.apache.hama.bsp.FileInputFormat; import org.apache.hama.bsp.FileOutputFormat; import org.apache.hama.bsp.KeyValueTextInputFormat; +import org.apache.hama.bsp.NullInputFormat; import org.apache.hama.bsp.SequenceFileInputFormat; import org.apache.hama.bsp.SequenceFileOutputFormat; import org.apache.hama.bsp.message.MessageManager; @@ -62,6 +64,7 @@ public class TestPipes extends HamaClust public static final String EXAMPLES_INSTALL_PROPERTY = "hama.pipes.examples.install"; public static final String EXAMPLE_SUMMATION_EXEC = "/examples/summation"; + public static final String EXAMPLE_PIESTIMATOR_EXEC = "/examples/piestimator"; public static final String EXAMPLE_MATRIXMULTIPLICATION_EXEC = "/examples/matrixmultiplication"; public static final String EXAMPLE_TMP_OUTPUT = "/tmp/test-example/"; public static final String HAMA_TMP_OUTPUT = "/tmp/hama-pipes/"; @@ -70,6 +73,7 @@ public class TestPipes extends HamaClust private HamaConfiguration configuration; private static FileSystem fs = null; + private String examplesInstallPath; public TestPipes() { configuration = new HamaConfiguration(); @@ -118,21 +122,25 @@ public class TestPipes extends HamaClust + " is empty! Skipping TestPipes!"); return; } + this.examplesInstallPath = System.getProperty(EXAMPLES_INSTALL_PROPERTY); // *** Summation Test *** summation(); + // *** PiEstimator Test *** + piestimation(); + // *** MatrixMultiplication Test *** matrixMult(); - + // Remove local temp folder cleanup(fs, new Path(EXAMPLE_TMP_OUTPUT)); } private void summation() throws Exception { // Setup Paths - String examplesInstallPath = System.getProperty(EXAMPLES_INSTALL_PROPERTY); - Path summationExec = new Path(examplesInstallPath + EXAMPLE_SUMMATION_EXEC); + Path summationExec = new Path(this.examplesInstallPath + + EXAMPLE_SUMMATION_EXEC); Path inputPath = new Path(EXAMPLE_TMP_OUTPUT + "summation/in"); Path outputPath = new Path(EXAMPLE_TMP_OUTPUT + "summation/out"); @@ -144,15 +152,36 @@ public class TestPipes extends HamaClust outputPath, 1, this.numOfGroom); // Verify output - verifySummationOutput(configuration, outputPath, sum); + verifyOutput(configuration, outputPath, sum.doubleValue(), + Math.pow(10, (DOUBLE_PRECISION * -1))); + + // Clean input and output folder + cleanup(fs, inputPath); + cleanup(fs, outputPath); + } + + private void piestimation() throws Exception { + // Setup Paths + Path piestimatorExec = new Path(this.examplesInstallPath + + EXAMPLE_PIESTIMATOR_EXEC); + Path inputPath = new Path(EXAMPLE_TMP_OUTPUT + "piestimator/in"); + Path outputPath = new Path(EXAMPLE_TMP_OUTPUT + "piestimator/out"); + + // Run PiEstimator example + runProgram(getPiestimatorJob(configuration), piestimatorExec, inputPath, + outputPath, 3, this.numOfGroom); + + // Verify output + verifyOutput(configuration, outputPath, Math.PI, Math.pow(10, (2 * -1))); + // Clean input and output folder cleanup(fs, inputPath); cleanup(fs, outputPath); } private void matrixMult() throws Exception { - String examplesInstallPath = System.getProperty(EXAMPLES_INSTALL_PROPERTY); - Path matrixmultiplicationExec = new Path(examplesInstallPath + // Setup Paths + Path matrixmultiplicationExec = new Path(this.examplesInstallPath + EXAMPLE_MATRIXMULTIPLICATION_EXEC); Path inputPath = new Path(EXAMPLE_TMP_OUTPUT + "matmult/in"); @@ -192,10 +221,19 @@ public class TestPipes extends HamaClust bsp.setInputKeyClass(Text.class); bsp.setInputValueClass(Text.class); bsp.setOutputFormat(SequenceFileOutputFormat.class); - bsp.setOutputKeyClass(Text.class); + bsp.setOutputKeyClass(NullWritable.class); bsp.setOutputValueClass(DoubleWritable.class); bsp.set("bsp.message.class", DoubleWritable.class.getName()); + return bsp; + } + static BSPJob getPiestimatorJob(HamaConfiguration conf) throws IOException { + BSPJob bsp = new BSPJob(conf); + bsp.setInputFormat(NullInputFormat.class); + bsp.setOutputFormat(SequenceFileOutputFormat.class); + bsp.setOutputKeyClass(NullWritable.class); + bsp.setOutputValueClass(DoubleWritable.class); + bsp.set("bsp.message.class", IntWritable.class.getName()); return bsp; } @@ -211,10 +249,10 @@ public class TestPipes extends HamaClust bsp.set(Constants.RUNTIME_PARTITIONING_DIR, HAMA_TMP_OUTPUT + "/parts"); bsp.set("bsp.message.class", PipesKeyValueWritable.class.getName()); - + bsp.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true); bsp.setPartitioner(PipesPartitioner.class); - + // sort sent messages bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS, "org.apache.hama.bsp.message.queue.SortedMessageTransferProtocol"); @@ -256,7 +294,6 @@ public class TestPipes extends HamaClust * rand.nextDouble(); matrix[i][j] = new BigDecimal(randomValue).setScale(DOUBLE_PRECISION, BigDecimal.ROUND_DOWN).doubleValue(); - // matrix[i][j] = rand.nextInt(9) + 1; } } return matrix; @@ -350,22 +387,21 @@ public class TestPipes extends HamaClust } } - static void verifySummationOutput(HamaConfiguration conf, Path outputPath, - BigDecimal sum) throws IOException { + static void verifyOutput(HamaConfiguration conf, Path outputPath, + double expectedResult, double delta) throws IOException { FileStatus[] listStatus = fs.listStatus(outputPath); for (FileStatus status : listStatus) { if (!status.isDir()) { SequenceFile.Reader reader = new SequenceFile.Reader(fs, status.getPath(), conf); - Text key = new Text(); + NullWritable key = NullWritable.get(); DoubleWritable value = new DoubleWritable(); if (reader.next(key, value)) { LOG.info("Output File: " + status.getPath()); LOG.info("key: '" + key + "' value: '" + value + "' expected: '" - + sum.doubleValue() + "'"); - assertEquals("Expected value: '" + sum + "' != '" + value + "'", - sum.doubleValue(), value.get(), - Math.pow(10, (DOUBLE_PRECISION * -1))); + + expectedResult + "'"); + assertEquals("Expected value: '" + expectedResult + "' != '" + value + + "'", expectedResult, value.get(), delta); } reader.close(); } @@ -417,9 +453,6 @@ public class TestPipes extends HamaClust FileInputFormat.setInputPaths(bsp, inputPath); FileOutputFormat.setOutputPath(bsp, outputPath); - Submitter.setIsJavaRecordReader(conf, true); - Submitter.setIsJavaRecordWriter(conf, true); - BSPJobClient jobClient = new BSPJobClient(conf); // Set bspTaskNum
