Author: tjungblut Date: Thu Mar 8 19:35:29 2012 New Revision: 1298532 URL: http://svn.apache.org/viewvc?rev=1298532&view=rev Log: Testcase fix (contributed by Suraj Menon)
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java?rev=1298532&r1=1298531&r2=1298532&view=diff ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (original) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java Thu Mar 8 19:35:29 2012 @@ -59,18 +59,24 @@ public class TestBSPTaskFaults extends T public static final Log LOG = LogFactory.getLog(HamaTestCase.class); - private static final int PORT = 54321; public static final String TEST_POINT = "bsp.ft.test.point"; - + public static final String TEST_GROOM_PORT = "bsp.ft.test.groomport"; + private static int TEST_NUMBER = 0; + + private volatile MinimalGroomServer groom; private volatile BSPPeerProtocol umbilical; private Server workerServer; private TaskAttemptID taskid = new TaskAttemptID(new TaskID(new BSPJobID( "job_201110302255", 1), 1), 1); - public volatile static HamaConfiguration conf; + public volatile HamaConfiguration conf; private ScheduledExecutorService testBSPTaskService; + + private static synchronized int incrementTestNumber(){ + return ++TEST_NUMBER; + } @SuppressWarnings("unused") public static class MinimalGroomServer implements BSPPeerProtocol { @@ -195,11 +201,15 @@ public class TestBSPTaskFaults extends T private Process bspTaskProcess; private Thread errorLog; private Thread infoLog; + private int testPoint; + private int testPort; - TestBSPProcessRunner() { + TestBSPProcessRunner(int point, int port) { sched = Executors.newScheduledThreadPool(1); future = new AtomicReference<ScheduledFuture<Integer>>(); bspTaskProcess = null; + testPoint = point; + testPort = port; } private void readStream(InputStream input) throws IOException { @@ -253,8 +263,9 @@ public class TestBSPTaskFaults extends T commands.add(TestBSPProcessRunner.class.getName()); LOG.info("starting process for failure case - " - + conf.getInt(TEST_POINT, 0)); - commands.add("" + conf.getInt(TEST_POINT, 0)); + + testPoint); + commands.add("" + testPoint); + commands.add("" + testPort); LOG.info(commands.toString()); @@ -308,16 +319,14 @@ public class TestBSPTaskFaults extends T TaskAttemptID tid = new TaskAttemptID(new TaskID(new BSPJobID( "job_201110102255", 1), 1), 1); - if (args.length > 0) { - hamaConf.setInt(TEST_POINT, Integer.parseInt(args[0])); - - } + hamaConf.setInt(TEST_POINT, Integer.parseInt(args[0])); + int port = Integer.parseInt(args[1]); try { BSPJob job = new BSPJob(hamaConf); final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy( BSPPeerProtocol.class, BSPPeerProtocol.versionID, - new InetSocketAddress("127.0.0.1", 54321), hamaConf); + new InetSocketAddress("127.0.0.1", port), hamaConf); BSPTask task = new BSPTask(); task.setConf(job); @@ -406,7 +415,8 @@ public class TestBSPTaskFaults extends T conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS, LocalBSPRunner.LocalSyncClient.class, SyncClient.class); - InetSocketAddress inetAddress = new InetSocketAddress(PORT); + int testNumber = incrementTestNumber(); + InetSocketAddress inetAddress = new InetSocketAddress(54321 + testNumber); groom = new MinimalGroomServer(conf); workerServer = RPC.getServer(groom, inetAddress.getHostName(), inetAddress.getPort(), conf); @@ -444,7 +454,7 @@ public class TestBSPTaskFaults extends T CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>( this.testBSPTaskService); Future<Integer> future = completionService - .submit(new TestBSPProcessRunner()); + .submit(new TestBSPProcessRunner(0, workerServer.getListenerAddress().getPort())); try { future.get(20000, TimeUnit.MILLISECONDS); @@ -471,10 +481,11 @@ public class TestBSPTaskFaults extends T LOG.info("Testing ping failure case - 1"); conf.setInt(TEST_POINT, 1); - CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>( - this.testBSPTaskService); + CompletionService<Integer> completionService = + new ExecutorCompletionService<Integer>(this.testBSPTaskService); Future<Integer> future = completionService - .submit(new TestBSPProcessRunner()); + .submit(new TestBSPProcessRunner(1, + workerServer.getListenerAddress().getPort())); try { future.get(20000, TimeUnit.MILLISECONDS); @@ -496,10 +507,11 @@ public class TestBSPTaskFaults extends T LOG.info("Testing ping failure case - 2"); conf.setInt(TEST_POINT, 2); - CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>( - this.testBSPTaskService); + CompletionService<Integer> completionService = + new ExecutorCompletionService<Integer>(this.testBSPTaskService); Future<Integer> future = completionService - .submit(new TestBSPProcessRunner()); + .submit(new TestBSPProcessRunner(2, + workerServer.getListenerAddress().getPort())); try { future.get(20000, TimeUnit.MILLISECONDS); @@ -522,10 +534,12 @@ public class TestBSPTaskFaults extends T LOG.info("Testing ping failure case - 3"); conf.setInt(TEST_POINT, 3); - CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>( - this.testBSPTaskService); + CompletionService<Integer> completionService = + new ExecutorCompletionService<Integer>(this.testBSPTaskService); + Future<Integer> future = completionService - .submit(new TestBSPProcessRunner()); + .submit(new TestBSPProcessRunner(3, + workerServer.getListenerAddress().getPort())); try { future.get(20000, TimeUnit.MILLISECONDS); @@ -546,10 +560,11 @@ public class TestBSPTaskFaults extends T public void testBSPTaskSelfDestroy() { LOG.info("Testing self kill on lost contact."); - CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>( - this.testBSPTaskService); + CompletionService<Integer> completionService = + new ExecutorCompletionService<Integer>(this.testBSPTaskService); Future<Integer> future = completionService - .submit(new TestBSPProcessRunner()); + .submit(new TestBSPProcessRunner(0, + workerServer.getListenerAddress().getPort())); try { while (groom.pingCount == 0) { Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1298532&r1=1298531&r2=1298532&view=diff ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Thu Mar 8 19:35:29 2012 @@ -36,6 +36,7 @@ import org.apache.hama.bsp.messages.Byte import org.apache.hama.bsp.sync.SyncClient; import org.apache.hama.bsp.sync.SyncServiceFactory; import org.apache.hama.ipc.BSPPeerProtocol; +import org.apache.hama.util.BSPNetUtils; public class TestCheckpoint extends TestCase { @@ -89,7 +90,7 @@ public class TestCheckpoint extends Test conf.setBoolean(Constants.CHECKPOINT_ENABLED, false); - int port = 54321; + int port = BSPNetUtils.getFreePort(5000); InetSocketAddress inetAddress = new InetSocketAddress(port); MinimalGroomServer groom = new MinimalGroomServer(conf); Server workerServer = RPC.getServer(groom, inetAddress.getHostName(),