Author: edwardyoon Date: Tue Apr 24 04:06:13 2012 New Revision: 1329544 URL: http://svn.apache.org/viewvc?rev=1329544&view=rev Log: Record Reader/Writer objects should be initialized
Modified: incubator/hama/trunk/CHANGES.txt incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java Modified: incubator/hama/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1329544&r1=1329543&r2=1329544&view=diff ============================================================================== --- incubator/hama/trunk/CHANGES.txt (original) +++ incubator/hama/trunk/CHANGES.txt Tue Apr 24 04:06:13 2012 @@ -16,6 +16,7 @@ Release 0.5 - April 10, 2012 IMPROVEMENTS + HAMA-562: Record Reader/Writer objects should be initialized (edwardyoon) HAMA-555: Separate bin and src distributions (edwardyoon) HAMA-548: Update 0.23.0-SNAPSHOT to 0.23.1 in pom file of yarn module (edwardyoon) HAMA-545: Include the API and other docs in the Hama release (Suraj Menon via edwardyoon) Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1329544&r1=1329543&r2=1329544&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Tue Apr 24 04:06:13 2012 @@ -52,10 +52,7 @@ public final class BSPPeerImpl<K1, V1, K private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class); public static enum PeerCounter { - SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, - IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED, - TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT, - COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS + SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS } private final Configuration conf; @@ -185,39 +182,40 @@ public final class BSPPeerImpl<K1, V1, K initInput(); - // just output something when the user configured it + String outdir = null; if (conf.get("bsp.output.dir") != null) { - Path outdir = new Path(conf.get("bsp.output.dir"), - Task.getOutputName(partition)); - outWriter = bspJob.getOutputFormat().getRecordWriter(fs, bspJob, - outdir.makeQualified(fs).toString()); - final RecordWriter<K2, V2> finalOut = outWriter; - - collector = new OutputCollector<K2, V2>() { - public void collect(K2 key, V2 value) throws IOException { - finalOut.write(key, value); - } - }; - } + Path outputDir = new Path(conf.get("bsp.output.dir", + "tmp-" + System.currentTimeMillis()), Task.getOutputName(partition)); + outdir = outputDir.makeQualified(fs).toString(); + } + outWriter = bspJob.getOutputFormat().getRecordWriter(fs, bspJob, outdir); + final RecordWriter<K2, V2> finalOut = outWriter; + + collector = new OutputCollector<K2, V2>() { + public void collect(K2 key, V2 value) throws IOException { + finalOut.write(key, value); + } + }; } @SuppressWarnings("unchecked") public final void initInput() throws IOException { - // just read input if the user defined one - if (conf.get("bsp.input.dir") != null) { - InputSplit inputSplit = null; - // reinstantiate the split - try { + InputSplit inputSplit = null; + // reinstantiate the split + try { + if (splitClass != null) { inputSplit = (InputSplit) ReflectionUtils.newInstance( getConfiguration().getClassByName(splitClass), getConfiguration()); - } catch (ClassNotFoundException exp) { - IOException wrap = new IOException("Split class " + splitClass - + " not found"); - wrap.initCause(exp); - throw wrap; } + } catch (ClassNotFoundException exp) { + IOException wrap = new IOException("Split class " + splitClass + + " not found"); + wrap.initCause(exp); + throw wrap; + } + if (inputSplit != null) { DataInputBuffer splitBuffer = new DataInputBuffer(); splitBuffer.reset(split.getBytes(), 0, split.getLength()); inputSplit.readFields(splitBuffer); Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java?rev=1329544&r1=1329543&r2=1329544&view=diff ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (original) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java Tue Apr 24 04:06:13 2012 @@ -171,7 +171,7 @@ public class TestBSPTaskFaults extends T private class TestBSPTaskThreadRunner extends Thread { BSPJob job; - + TestBSPTaskThreadRunner(BSPJob jobConf) { job = jobConf; } @@ -325,6 +325,9 @@ public class TestBSPTaskFaults extends T try { BSPJob job = new BSPJob(hamaConf); + job.setInputFormat(NullInputFormat.class); + job.setOutputFormat(NullOutputFormat.class); + final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy( BSPPeerProtocol.class, BSPPeerProtocol.versionID, new InetSocketAddress("127.0.0.1", port), hamaConf);