Modified: hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1369551&r1=1369550&r2=1369551&view=diff ============================================================================== --- hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original) +++ hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Sun Aug 5 11:07:48 2012 @@ -17,7 +17,18 @@ */ package org.apache.hama.bsp; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import junit.framework.TestCase; @@ -25,19 +36,30 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hama.Constants; -import org.apache.hama.HamaConfiguration; -import org.apache.hama.bsp.TestBSPTaskFaults.MinimalGroomServer; -import org.apache.hama.bsp.message.type.ByteMessage; -import org.apache.hama.bsp.sync.SyncClient; +import org.apache.hama.bsp.Counters.Counter; +import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl; +import org.apache.hama.bsp.ft.FaultTolerantPeerService; +import org.apache.hama.bsp.message.MessageEventListener; +import org.apache.hama.bsp.message.MessageManager; +import org.apache.hama.bsp.message.MessageQueue; +import org.apache.hama.bsp.sync.BSPPeerSyncClient; +import org.apache.hama.bsp.sync.PeerSyncClient; +import org.apache.hama.bsp.sync.SyncEvent; +import org.apache.hama.bsp.sync.SyncEventListener; +import org.apache.hama.bsp.sync.SyncException; import org.apache.hama.bsp.sync.SyncServiceFactory; -import org.apache.hama.ipc.BSPPeerProtocol; -import org.apache.hama.ipc.HamaRPCProtocolVersion; import org.apache.hama.util.BSPNetUtils; +import org.apache.hama.util.KeyValuePair; public class TestCheckpoint extends TestCase { @@ -45,130 +67,578 @@ public class TestCheckpoint extends Test static final String checkpointedDir = "checkpoint/job_201110302255_0001/0/"; - @SuppressWarnings({ "unchecked", "rawtypes" }) + public static class TestMessageManager implements MessageManager<Text> { + + List<Text> messageQueue = new ArrayList<Text>(); + BSPMessageBundle<Text> loopbackBundle = new BSPMessageBundle<Text>(); + Iterator<Text> iter = null; + MessageEventListener<Text> listener; + + @Override + public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, Text> peer, + Configuration conf, InetSocketAddress peerAddress) { + // TODO Auto-generated method stub + + } + + @Override + public void close() { + // TODO Auto-generated method stub + + } + + @Override + public Text getCurrentMessage() throws IOException { + if (iter == null) + iter = this.messageQueue.iterator(); + if (iter.hasNext()) + return iter.next(); + return null; + } + + @Override + public void send(String peerName, Text msg) throws IOException { + } + + @Override + public void finishSendPhase() throws IOException { + } + + @Override + public Iterator<Entry<InetSocketAddress, MessageQueue<Text>>> getMessageIterator() { + return null; + } + + @Override + public void transfer(InetSocketAddress addr, BSPMessageBundle<Text> bundle) + throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void clearOutgoingQueues() { + } + + @Override + public int getNumCurrentMessages() { + return this.messageQueue.size(); + } + + public BSPMessageBundle<Text> getLoopbackBundle() { + return this.loopbackBundle; + } + + public void addMessage(Text message) throws IOException { + this.messageQueue.add(message); + listener.onMessageReceived(message); + } + + @SuppressWarnings("unchecked") + @Override + public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) { + this.loopbackBundle = (BSPMessageBundle<Text>) bundle; + } + + @Override + public void loopBackMessage(Writable message) { + } + + @Override + public void registerListener(MessageEventListener<Text> listener) + throws IOException { + this.listener = listener; + } + + } + + public static class TestBSPPeer implements + BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, Text> { + + Configuration conf; + long superstepCount; + FaultTolerantPeerService<Text> fService; + + public TestBSPPeer(BSPJob job, Configuration conf, TaskAttemptID taskId, + Counters counters, long superstep, BSPPeerSyncClient syncClient, + MessageManager<Text> messenger, TaskStatus.State state) { + this.conf = conf; + if (superstep > 0) + superstepCount = superstep; + else + superstepCount = 0L; + + try { + fService = (new AsyncRcvdMsgCheckpointImpl<Text>()).constructPeerFaultTolerance( + job, (BSPPeer<?, ?, ?, ?, Text>) this, + (BSPPeerSyncClient) syncClient, null, taskId, superstep, conf, + messenger); + this.fService.onPeerInitialized(state); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void send(String peerName, Text msg) throws IOException { + } + + @Override + public Text getCurrentMessage() throws IOException { + return new Text("data"); + } + + @Override + public int getNumCurrentMessages() { + return 1; + } + + @Override + public void sync() throws IOException, SyncException, InterruptedException { + ++superstepCount; + try { + this.fService.afterBarrier(); + } catch (Exception e) { + e.printStackTrace(); + } + LOG.info("After barrier " + superstepCount); + } + + @Override + public long getSuperstepCount() { + return superstepCount; + } + + @Override + public String getPeerName() { + return null; + } + + @Override + public String getPeerName(int index) { + return null; + } + + @Override + public int getPeerIndex() { + return 1; + } + + @Override + public String[] getAllPeerNames() { + return null; + } + + @Override + public int getNumPeers() { + return 0; + } + + @Override + public void clear() { + + } + + @Override + public void write(NullWritable key, NullWritable value) throws IOException { + + } + + @Override + public boolean readNext(NullWritable key, NullWritable value) + throws IOException { + return false; + } + + @Override + public KeyValuePair<NullWritable, NullWritable> readNext() + throws IOException { + return null; + } + + @Override + public void reopenInput() throws IOException { + + } + + @Override + public Configuration getConfiguration() { + return null; + } + + @Override + public Counter getCounter(Enum<?> name) { + return null; + } + + @Override + public Counter getCounter(String group, String name) { + return null; + } + + @Override + public void incrementCounter(Enum<?> key, long amount) { + + } + + @Override + public void incrementCounter(String group, String counter, long amount) { + + } + + } + + public static class TempSyncClient extends BSPPeerSyncClient { + + Map<String, Writable> valueMap = new HashMap<String, Writable>(); + + @Override + public String constructKey(BSPJobID jobId, String... args) { + StringBuffer buffer = new StringBuffer(100); + buffer.append(jobId.toString()).append("/"); + for (String arg : args) { + buffer.append(arg).append("/"); + } + return buffer.toString(); + } + + @Override + public boolean storeInformation(String key, Writable value, + boolean permanent, SyncEventListener listener) { + ArrayWritable writables = (ArrayWritable) value; + long step = ((LongWritable) writables.get()[0]).get(); + long count = ((LongWritable) writables.get()[1]).get(); + + LOG.info("SyncClient Storing value step = " + step + " count = " + count + + " for key " + key); + valueMap.put(key, value); + return true; + } + + @Override + public boolean getInformation(String key, + Writable valueHolder) { + LOG.info("Getting value for key " + key); + if(!valueMap.containsKey(key)){ + return false; + } + Writable value = valueMap.get(key); + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + DataOutputStream outputStream = new DataOutputStream(byteStream); + byte[] data = null; + try { + value.write(outputStream); + outputStream.flush(); + data = byteStream.toByteArray(); + ByteArrayInputStream istream = new ByteArrayInputStream(data); + DataInputStream diStream = new DataInputStream(istream); + valueHolder.readFields(diStream); + return true; + } catch (IOException e) { + LOG.error("Error writing data to write buffer.", e); + } finally { + try { + byteStream.close(); + outputStream.close(); + } catch (IOException e) { + LOG.error("Error closing byte stream.", e); + } + } + return false; + } + + @Override + public boolean addKey(String key, boolean permanent, + SyncEventListener listener) { + valueMap.put(key, NullWritable.get()); + return true; + } + + @Override + public boolean hasKey(String key) { + return valueMap.containsKey(key); + } + + @Override + public String[] getChildKeySet(String key, SyncEventListener listener) { + List<String> list = new ArrayList<String>(); + Iterator<String> keyIter = valueMap.keySet().iterator(); + while (keyIter.hasNext()) { + String keyVal = keyIter.next(); + if (keyVal.startsWith(key + "/")) { + list.add(keyVal); + } + } + String[] arr = new String[list.size()]; + list.toArray(arr); + return arr; + } + + @Override + public boolean registerListener(String key, SyncEvent event, + SyncEventListener listener) { + return false; + } + + @Override + public boolean remove(String key, SyncEventListener listener) { + valueMap.remove(key); + return false; + } + + @Override + public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId) + throws Exception { + } + + @Override + public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId, + long superstep) throws SyncException { + LOG.info("Enter barrier called - " + superstep); + } + + @Override + public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId, + long superstep) throws SyncException { + LOG.info("Exit barrier called - " + superstep); + } + + @Override + public void register(BSPJobID jobId, TaskAttemptID taskId, + String hostAddress, long port) { + } + + @Override + public String[] getAllPeerNames(TaskAttemptID taskId) { + return null; + } + + @Override + public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId, + String hostAddress, long port) { + } + + @Override + public void stopServer() { + } + + @Override + public void close() throws IOException { + } + + } + + private void checkSuperstepMsgCount(PeerSyncClient syncClient, + @SuppressWarnings("rawtypes") + BSPPeer bspTask, BSPJob job, long step, long count) { + + ArrayWritable writableVal = new ArrayWritable(LongWritable.class); + + boolean result = syncClient.getInformation( + syncClient.constructKey(job.getJobID(), "checkpoint", + "" + bspTask.getPeerIndex()), writableVal); + + assertTrue(result); + + LongWritable superstepNo = (LongWritable) writableVal.get()[0]; + LongWritable msgCount = (LongWritable) writableVal.get()[1]; + + assertEquals(step, superstepNo.get()); + assertEquals(count, msgCount.get()); + } + + public void testCheckpointInterval() throws Exception { + Configuration config = new Configuration(); + System.setProperty("user.dir", "/tmp"); + config.set(SyncServiceFactory.SYNC_PEER_CLASS, + TempSyncClient.class.getName()); + config.set(Constants.FAULT_TOLERANCE_CLASS, + AsyncRcvdMsgCheckpointImpl.class.getName()); + config.setBoolean(Constants.FAULT_TOLERANCE_FLAG, true); + config.setBoolean(Constants.CHECKPOINT_ENABLED, true); + config.setInt(Constants.CHECKPOINT_INTERVAL, 2); + config.set("bsp.output.dir", "/tmp/hama-test_out"); + config.set("bsp.local.dir", "/tmp/hama-test"); + + FileSystem dfs = FileSystem.get(config); + BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp"); + TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1); + + TestMessageManager messenger = new TestMessageManager(); + PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory + .getPeerSyncClient(config); + @SuppressWarnings("rawtypes") + BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L, + (BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING); + + assertNotNull("BSPPeerImpl should not be null.", bspTask); + + LOG.info("Created bsp peer and other parameters"); + int port = BSPNetUtils.getFreePort(12502); + LOG.info("Got port = " + port); + + boolean result = syncClient.getInformation( + syncClient.constructKey(job.getJobID(), "checkpoint", + "" + bspTask.getPeerIndex()), new ArrayWritable(LongWritable.class)); + + assertFalse(result); + + bspTask.sync(); + // Superstep 1 + + checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L); + + Text txtMessage = new Text("data"); + messenger.addMessage(txtMessage); + + bspTask.sync(); + // Superstep 2 + + checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L); + + messenger.addMessage(txtMessage); + + bspTask.sync(); + // Superstep 3 + + checkSuperstepMsgCount(syncClient, bspTask, job, 3L, 1L); + + bspTask.sync(); + // Superstep 4 + + checkSuperstepMsgCount(syncClient, bspTask, job, 3L, 1L); + + messenger.addMessage(txtMessage); + messenger.addMessage(txtMessage); + + bspTask.sync(); + // Superstep 5 + + checkSuperstepMsgCount(syncClient, bspTask, job, 5L, 2L); + + bspTask.sync(); + // Superstep 6 + + checkSuperstepMsgCount(syncClient, bspTask, job, 5L, 2L); + + dfs.delete(new Path("checkpoint"), true); + } + + @SuppressWarnings("rawtypes") public void testCheckpoint() throws Exception { Configuration config = new Configuration(); config.set(SyncServiceFactory.SYNC_PEER_CLASS, - LocalBSPRunner.LocalSyncClient.class.getName()); + TempSyncClient.class.getName()); + config.setBoolean(Constants.FAULT_TOLERANCE_FLAG, true); + config.set(Constants.FAULT_TOLERANCE_CLASS, + AsyncRcvdMsgCheckpointImpl.class.getName()); + config.setBoolean(Constants.CHECKPOINT_ENABLED, true); + int port = BSPNetUtils.getFreePort(12502); + LOG.info("Got port = " + port); + + config.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST); + config.setInt(Constants.PEER_PORT, port); + config.set("bsp.output.dir", "/tmp/hama-test_out"); + config.set("bsp.local.dir", "/tmp/hama-test"); + FileSystem dfs = FileSystem.get(config); + BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp"); + TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1); + + TestMessageManager messenger = new TestMessageManager(); + PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory + .getPeerSyncClient(config); + BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L, + (BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING); - BSPPeerImpl bspTask = new BSPPeerImpl(config, dfs); - bspTask.setCurrentTaskStatus(new TaskStatus(new BSPJobID(), - new TaskAttemptID(), 1.0f, TaskStatus.State.RUNNING, "running", - "127.0.0.1", TaskStatus.Phase.STARTING, new Counters())); assertNotNull("BSPPeerImpl should not be null.", bspTask); - if (dfs.mkdirs(new Path("checkpoint"))) { - if (dfs.mkdirs(new Path("checkpoint/job_201110302255_0001"))) { - if (dfs.mkdirs(new Path("checkpoint/job_201110302255_0001/0"))) - ; - } - } - assertTrue("Make sure directory is created.", - dfs.exists(new Path(checkpointedDir))); - byte[] tmpData = "data".getBytes(); - BSPMessageBundle bundle = new BSPMessageBundle(); - bundle.addMessage(new ByteMessage("abc".getBytes(), tmpData)); - assertNotNull("Message bundle can not be null.", bundle); - assertNotNull("Configuration should not be null.", config); - bspTask.checkpoint(checkpointedDir + "/attempt_201110302255_0001_000000_0", - bundle); - FSDataInputStream in = dfs.open(new Path(checkpointedDir - + "/attempt_201110302255_0001_000000_0")); - BSPMessageBundle bundleRead = new BSPMessageBundle(); - bundleRead.readFields(in); - in.close(); - ByteMessage byteMsg = (ByteMessage) (bundleRead.getMessages()).get(0); - String content = new String(byteMsg.getData()); - LOG.info("Saved checkpointed content is " + content); - assertTrue("Message content should be the same.", "data".equals(content)); + + LOG.info("Created bsp peer and other parameters"); + + @SuppressWarnings("unused") + FaultTolerantPeerService<Text> service = null; + + bspTask.sync(); + LOG.info("Completed first sync."); + + checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L); + + Text txtMessage = new Text("data"); + messenger.addMessage(txtMessage); + + bspTask.sync(); + + LOG.info("Completed second sync."); + + checkSuperstepMsgCount(syncClient, bspTask, job, 2L, 1L); + + // Checking the messages for superstep 2 and peer id 1 + String expectedPath = "checkpoint/job_checkpttest_0001/2/1"; + FSDataInputStream in = dfs.open(new Path(expectedPath)); + + String className = in.readUTF(); + Text message = (Text) ReflectionUtils.newInstance(Class.forName(className), + config); + message.readFields(in); + + assertEquals("data", message.toString()); + dfs.delete(new Path("checkpoint"), true); } - public void testCheckpointInterval() throws Exception { + public void testPeerRecovery() throws Exception { + Configuration config = new Configuration(); + config.set(SyncServiceFactory.SYNC_PEER_CLASS, + TempSyncClient.class.getName()); + config.set(Constants.FAULT_TOLERANCE_CLASS, + AsyncRcvdMsgCheckpointImpl.class.getName()); + config.setBoolean(Constants.CHECKPOINT_ENABLED, true); + int port = BSPNetUtils.getFreePort(12502); + LOG.info("Got port = " + port); - Configuration conf = new Configuration(); - conf.set("bsp.output.dir", "/tmp/hama-test_out"); - conf.setClass(SyncServiceFactory.SYNC_PEER_CLASS, - LocalBSPRunner.LocalSyncClient.class, SyncClient.class); - - conf.setBoolean(Constants.CHECKPOINT_ENABLED, false); - - int port = BSPNetUtils.getFreePort(5000); - InetSocketAddress inetAddress = new InetSocketAddress(port); - MinimalGroomServer groom = new MinimalGroomServer(conf); - Server workerServer = RPC.getServer(groom, inetAddress.getHostName(), - inetAddress.getPort(), conf); - workerServer.start(); - - LOG.info("Started RPC server"); - conf.setInt("bsp.groom.rpc.port", inetAddress.getPort()); - conf.setInt("bsp.peers.num", 1); - - BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy( - BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, inetAddress, - conf); - LOG.info("Started the proxy connections"); - - TaskAttemptID tid = new TaskAttemptID(new TaskID(new BSPJobID( - "job_201110102255", 1), 1), 1); - - try { - BSPJob job = new BSPJob(new HamaConfiguration(conf)); - job.setOutputPath(TestBSPMasterGroomServer.OUTPUT_PATH); - job.setOutputFormat(TextOutputFormat.class); - final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy( - BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, - new InetSocketAddress("127.0.0.1", port), conf); + config.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST); + config.setInt(Constants.PEER_PORT, port); - BSPTask task = new BSPTask(); - task.setConf(job); + config.set("bsp.output.dir", "/tmp/hama-test_out"); + config.set("bsp.local.dir", "/tmp/hama-test"); - @SuppressWarnings("rawtypes") - BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job, conf, tid, - proto, 0, null, null, new Counters()); + FileSystem dfs = FileSystem.get(config); + BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp"); + TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1); - bspPeer.setCurrentTaskStatus(new TaskStatus(new BSPJobID(), tid, 1.0f, - TaskStatus.State.RUNNING, "running", "127.0.0.1", - TaskStatus.Phase.STARTING, new Counters())); - - assertEquals(bspPeer.isReadyToCheckpoint(), false); - - conf.setBoolean(Constants.CHECKPOINT_ENABLED, true); - conf.setInt(Constants.CHECKPOINT_INTERVAL, 3); - - bspPeer.sync(); - - LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step " - + bspPeer.getSuperstepCount()); - assertEquals(bspPeer.isReadyToCheckpoint(), false); - bspPeer.sync(); - LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step " - + bspPeer.getSuperstepCount()); - assertEquals(bspPeer.isReadyToCheckpoint(), false); - bspPeer.sync(); - LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step " - + bspPeer.getSuperstepCount()); - assertEquals(bspPeer.isReadyToCheckpoint(), true); - - job.setCheckPointInterval(5); - bspPeer.sync(); - LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step " - + bspPeer.getSuperstepCount()); - assertEquals(bspPeer.isReadyToCheckpoint(), false); - bspPeer.sync(); - LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step " - + bspPeer.getSuperstepCount()); - assertEquals(bspPeer.isReadyToCheckpoint(), false); - - } catch (Exception e) { - LOG.error("Error testing BSPPeer.", e); - } finally { - umbilical.close(); - Thread.sleep(2000); - workerServer.stop(); - Thread.sleep(2000); + TestMessageManager messenger = new TestMessageManager(); + PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory + .getPeerSyncClient(config); + + Text txtMessage = new Text("data"); + String writeKey = "job_checkpttest_0001/checkpoint/1/"; + + Writable[] writableArr = new Writable[2]; + writableArr[0] = new LongWritable(3L); + writableArr[1] = new LongWritable(5L); + ArrayWritable arrWritable = new ArrayWritable(LongWritable.class); + arrWritable.set(writableArr); + syncClient.storeInformation(writeKey, arrWritable, true, null); + + String writePath = "checkpoint/job_checkpttest_0001/3/1"; + FSDataOutputStream out = dfs.create(new Path(writePath)); + for (int i = 0; i < 5; ++i) { + out.writeUTF(txtMessage.getClass().getCanonicalName()); + txtMessage.write(out); } + out.close(); + @SuppressWarnings("unused") + BSPPeer<?, ?, ?, ?, Text> bspTask = new TestBSPPeer(job, config, taskId, + new Counters(), 3L, (BSPPeerSyncClient) syncClient, messenger, + TaskStatus.State.RECOVERING); + + BSPMessageBundle<Text> bundleRead = messenger.getLoopbackBundle(); + assertEquals(5, bundleRead.getMessages().size()); + String recoveredMsg = bundleRead.getMessages().get(0).toString(); + assertEquals(recoveredMsg, "data"); + dfs.delete(new Path("checkpoint"), true); } + }
Added: hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestTaskAllocation.java URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestTaskAllocation.java?rev=1369551&view=auto ============================================================================== --- hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestTaskAllocation.java (added) +++ hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestTaskAllocation.java Sun Aug 5 11:07:48 2012 @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hama.bsp.BSPJobClient.RawSplit; +import org.apache.hama.bsp.taskallocation.BSPResource; +import org.apache.hama.bsp.taskallocation.BestEffortDataLocalTaskAllocator; +import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy; + +public class TestTaskAllocation extends TestCase { + + public static final Log LOG = LogFactory.getLog(TestTaskAllocation.class); + + @Override + protected void setUp() throws Exception { + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + } + + public void testBestEffortDataLocality() throws Exception { + + Configuration conf = new Configuration(); + + String[] locations = new String[] { "host6", "host4", "host3" }; + String value = "data"; + RawSplit split = new RawSplit(); + split.setLocations(locations); + split.setBytes(value.getBytes(), 0, value.getBytes().length); + split.setDataLength(value.getBytes().length); + + assertEquals(value.getBytes().length, (int) split.getDataLength()); + + Map<GroomServerStatus, Integer> taskCountInGroomMap = new HashMap<GroomServerStatus, Integer>( + 20); + BSPResource[] resources = new BSPResource[0]; + BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp"); + JobInProgress jobProgress = new JobInProgress(job.getJobID(), conf); + TaskInProgress taskInProgress = new TaskInProgress(job.getJobID(), + "job.xml", split, conf, jobProgress, 1); + + Map<String, GroomServerStatus> groomStatuses = new HashMap<String, GroomServerStatus>( + 20); + + for (int i = 0; i < 10; ++i) { + + String name = "host" + i; + GroomServerStatus status = new GroomServerStatus(name, + new ArrayList<TaskStatus>(), 0, 3); + groomStatuses.put(name, status); + taskCountInGroomMap.put(status, 0); + + } + + TaskAllocationStrategy strategy = ReflectionUtils.newInstance(conf + .getClass("", BestEffortDataLocalTaskAllocator.class, + TaskAllocationStrategy.class), conf); + + String[] hosts = strategy.selectGrooms(groomStatuses, taskCountInGroomMap, + resources, taskInProgress); + + List<String> list = new ArrayList<String>(); + + for (int i = 0; i < hosts.length; ++i) { + list.add(hosts[i]); + } + + assertTrue(list.contains("host6")); + assertTrue(list.contains("host3")); + assertTrue(list.contains("host4")); + + } + +} Modified: hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java?rev=1369551&r1=1369550&r2=1369551&view=diff ============================================================================== --- hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java (original) +++ hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java Sun Aug 5 11:07:48 2012 @@ -23,7 +23,9 @@ import java.io.IOException; import junit.framework.TestCase; +import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hama.Constants; import org.apache.hama.HamaConfiguration; @@ -41,6 +43,7 @@ public class TestZooKeeper extends TestC public TestZooKeeper() { configuration = new HamaConfiguration(); + System.setProperty("user.dir", "/tmp"); configuration.set("bsp.master.address", "localhost"); assertEquals("Make sure master addr is set to localhost:", "localhost", configuration.get("bsp.master.address")); @@ -144,12 +147,32 @@ public class TestZooKeeper extends TestC Log.info("Passed the key presence test"); - Writable value = masterClient - .getInformation(masterClient.constructKey(jobID, "info", "level2"), - IntWritable.class); - - assertEquals(null, value); - Log.info("Passed the null value check."); + boolean result = masterClient + .getInformation(masterClient.constructKey(jobID, "info", "level3"), + new IntWritable()); + + assertEquals(false, result); + + Writable[] writableArr = new Writable[2]; + writableArr[0] = new LongWritable(3L); + writableArr[1] = new LongWritable(5L); + ArrayWritable arrWritable = new ArrayWritable(LongWritable.class); + arrWritable.set(writableArr); + masterClient.storeInformation( + masterClient.constructKey(jobID, "info", "level3"), + arrWritable, true, null); + + ArrayWritable valueHolder = new ArrayWritable(LongWritable.class); + + boolean getResult = masterClient.getInformation( + masterClient.constructKey(jobID, "info", "level3"), valueHolder); + + assertTrue(getResult); + + assertEquals(arrWritable.get()[0], valueHolder.get()[0]); + assertEquals(arrWritable.get()[1], valueHolder.get()[1]); + + Log.info("Passed array writable test"); done = true; } catch (Exception e) { Modified: hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java?rev=1369551&r1=1369550&r2=1369551&view=diff ============================================================================== --- hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java (original) +++ hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java Sun Aug 5 11:07:48 2012 @@ -17,7 +17,6 @@ */ package org.apache.hama.bsp.sync; -import java.io.File; import java.util.concurrent.Executors; import junit.framework.TestCase; @@ -36,38 +35,36 @@ import org.apache.hama.util.BSPNetUtils; public class TestSyncServiceFactory extends TestCase { public static final Log LOG = LogFactory.getLog(TestCase.class); - - public static class ListenerTest extends ZKSyncEventListener{ + + public static class ListenerTest extends ZKSyncEventListener { private Text value; - - public ListenerTest(){ + + public ListenerTest() { value = new Text("init"); } - - public String getValue(){ + + public String getValue() { return value.toString(); } - + @Override public void onDelete() { - // TODO Auto-generated method stub - + } @Override public void onChange() { LOG.info("ZK value changed event triggered."); value.set("Changed"); - + } @Override public void onChildKeySetChange() { - // TODO Auto-generated method stub - + } - + } public void testClientInstantiation() throws Exception { @@ -96,7 +93,6 @@ public class TestSyncServiceFactory exte @Override public void run() { - // TODO Auto-generated method stub try { server.start(); } catch (Exception e) { @@ -109,10 +105,13 @@ public class TestSyncServiceFactory exte public void testZKSyncStore() throws Exception { Configuration conf = new Configuration(); int zkPort = BSPNetUtils.getFreePort(21811); + conf.set("bsp.local.dir", "/tmp/hama-test"); + conf.set("bsp.output.dir", "/tmp/hama-test_out"); conf.setInt(Constants.PEER_PORT, zkPort); conf.set(Constants.ZOOKEEPER_QUORUM, "localhost"); conf.setInt(Constants.ZOOKEEPER_CLIENT_PORT, zkPort); conf.set(Constants.ZOOKEEPER_SESSION_TIMEOUT, "12000"); + System.setProperty("user.dir", "/tmp"); // given null, should return zookeeper final SyncServer syncServer = SyncServiceFactory.getSyncServer(conf); syncServer.init(conf); @@ -123,8 +122,8 @@ public class TestSyncServiceFactory exte Thread.sleep(1000); - final PeerSyncClient syncClient = (PeerSyncClient) - SyncServiceFactory.getPeerSyncClient(conf); + final PeerSyncClient syncClient = (PeerSyncClient) SyncServiceFactory + .getPeerSyncClient(conf); assertTrue(syncClient instanceof ZooKeeperSyncClientImpl); BSPJobID jobId = new BSPJobID("abc", 1); TaskAttemptID taskId = new TaskAttemptID(new TaskID(jobId, 1), 1); @@ -142,69 +141,45 @@ public class TestSyncServiceFactory exte } }); - try { - IntWritable data = new IntWritable(5); - syncClient.storeInformation( - syncClient.constructKey(jobId, String.valueOf(1L), "test"), data, - true, null); - - ListenerTest listenerTest = new ListenerTest(); - - - syncClient.registerListener( - syncClient.constructKey(jobId, String.valueOf(1L), "test"), - ZKSyncEventFactory.getValueChangeEvent(), - listenerTest); - - IntWritable value = (IntWritable) syncClient.getInformation( - syncClient.constructKey(jobId, String.valueOf(1L), "test"), - IntWritable.class); - assertTrue(value != null); - int intVal = value == null ? 0 : value.get(); - assertTrue(intVal == data.get()); - - data.set(6); - syncClient.storeInformation( - syncClient.constructKey(jobId, String.valueOf(1L), "test"), data, - true, null); - value = (IntWritable) syncClient.getInformation( - syncClient.constructKey(jobId, String.valueOf(1L), "test"), - IntWritable.class); - - - intVal = value == null ? 0 : value.get(); - assertTrue(intVal == data.get()); - - Thread.sleep(5000); - - assertEquals(true, listenerTest.getValue().equals("Changed")); - - - syncServer.stopServer(); - } finally { - - String dir = System.getProperty("user.dir"); - LOG.info("Deleting zookeeper files in " + dir); - File zookeeperDir = new File(dir + File.separator + "nullzookeeper"); - if (zookeeperDir.exists()) { - File[] files = zookeeperDir.listFiles(); - for (File file : files) { - if (file.isDirectory()) { - File[] childFiles = file.listFiles(); - for (File childFile : childFiles) { - LOG.info("Deleting zookeeper file - " - + childFile.getAbsolutePath()); - childFile.delete(); - } - } else { - LOG.info("Deleting zookeeper file - " + file.getAbsolutePath()); - file.delete(); - } - } - zookeeperDir.delete(); + IntWritable data = new IntWritable(5); + syncClient.storeInformation( + syncClient.constructKey(jobId, String.valueOf(1L), "test"), data, true, + null); + + ListenerTest listenerTest = new ListenerTest(); + + syncClient.registerListener( + syncClient.constructKey(jobId, String.valueOf(1L), "test"), + ZKSyncEventFactory.getValueChangeEvent(), listenerTest); + + IntWritable valueHolder = new IntWritable(); + boolean result = syncClient + .getInformation( + syncClient.constructKey(jobId, String.valueOf(1L), "test"), + valueHolder); + assertTrue(result); + int intVal = valueHolder.get(); + assertTrue(intVal == data.get()); + + data.set(6); + syncClient.storeInformation( + syncClient.constructKey(jobId, String.valueOf(1L), "test"), data, true, + null); + valueHolder = new IntWritable(); + result = syncClient + .getInformation( + syncClient.constructKey(jobId, String.valueOf(1L), "test"), + valueHolder); + + assertTrue(result); + intVal = valueHolder.get(); + assertTrue(intVal == data.get()); - } - } + Thread.sleep(5000); + + assertEquals(true, listenerTest.getValue().equals("Changed")); + + syncServer.stopServer(); }
