http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a18336/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/TestFiPipelines.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/TestFiPipelines.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/TestFiPipelines.java deleted file mode 100644 index cba3d9c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/TestFiPipelines.java +++ /dev/null @@ -1,247 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.Random; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fi.FiTestUtil; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.server.datanode.BlockReceiverAspects; -import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.log4j.Level; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class TestFiPipelines { - public static final Log LOG = LogFactory.getLog(TestFiPipelines.class); - - private static short REPL_FACTOR = 3; - private static final int RAND_LIMIT = 2000; - - private MiniDFSCluster cluster; - private DistributedFileSystem fs; - private static Configuration conf; - Random rand = new Random(RAND_LIMIT); - - static { - initLoggers(); - setConfiguration(); - } - - @Before - public void startUpCluster() throws IOException { - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL_FACTOR).build(); - fs = (DistributedFileSystem) cluster.getFileSystem(); - } - - @After - synchronized public void shutDownCluster() throws IOException { - if (cluster != null) cluster.shutdown(); - } - - /** - * Test initiates and sets actions created by injection framework. The actions - * work with both aspects of sending acknologment packets in a pipeline. - * Creates and closes a file of certain length < packet size. - * Injected actions will check if number of visible bytes at datanodes equals - * to number of acknoleged bytes - * - * @throws IOException in case of an error - */ - @Test - public void pipeline_04() throws IOException { - final String METHOD_NAME = GenericTestUtils.getMethodName(); - if(LOG.isDebugEnabled()) { - LOG.debug("Running " + METHOD_NAME); - } - - final PipelinesTestUtil.PipelinesTest pipst = - (PipelinesTestUtil.PipelinesTest) PipelinesTestUtil.initTest(); - - pipst.fiCallSetNumBytes.set(new PipelinesTestUtil.ReceivedCheckAction(METHOD_NAME)); - pipst.fiCallSetBytesAcked.set(new PipelinesTestUtil.AckedCheckAction(METHOD_NAME)); - - Path filePath = new Path("/" + METHOD_NAME + ".dat"); - FSDataOutputStream fsOut = fs.create(filePath); - TestPipelines.writeData(fsOut, 2); - fs.close(); - } - - /** - * Similar to pipeline_04 but sends many packets into a pipeline - * @throws IOException in case of an error - */ - @Test - public void pipeline_05() throws IOException { - final String METHOD_NAME = GenericTestUtils.getMethodName(); - if(LOG.isDebugEnabled()) { - LOG.debug("Running " + METHOD_NAME); - } - - final PipelinesTestUtil.PipelinesTest pipst = - (PipelinesTestUtil.PipelinesTest) PipelinesTestUtil.initTest(); - - pipst.fiCallSetNumBytes.set(new PipelinesTestUtil.ReceivedCheckAction(METHOD_NAME)); - pipst.fiCallSetBytesAcked.set(new PipelinesTestUtil.AckedCheckAction(METHOD_NAME)); - - Path filePath = new Path("/" + METHOD_NAME + ".dat"); - FSDataOutputStream fsOut = fs.create(filePath); - for (int i = 0; i < 17; i++) { - TestPipelines.writeData(fsOut, 23); - } - fs.close(); - } - - /** - * This quite tricky test prevents acknowledgement packets from a datanode - * This should block any write attempts after ackQueue is full. - * Test is blocking, so the MiniDFSCluster has to be killed harshly. - * @throws IOException in case of an error - */ - @Test - public void pipeline_06() throws IOException { - final String METHOD_NAME = GenericTestUtils.getMethodName(); - final int MAX_PACKETS = 80; - - if(LOG.isDebugEnabled()) { - LOG.debug("Running " + METHOD_NAME); - } - - final PipelinesTestUtil.PipelinesTest pipst = - (PipelinesTestUtil.PipelinesTest) PipelinesTestUtil.initTest(); - - pipst.setSuspend(true); // This is ack. suspend test - Path filePath = new Path("/" + METHOD_NAME + ".dat"); - FSDataOutputStream fsOut = fs.create(filePath); - - int cnt = 0; - try { - // At this point let's start an external checker thread, which will - // verify the test's results and shutdown the MiniDFSCluster for us, - // because what it's gonna do has BLOCKING effect on datanodes - QueueChecker cq = new QueueChecker(pipst, MAX_PACKETS); - cq.start(); - // The following value is explained by the fact that size of a packet isn't - // necessary equals to the value of - // DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY - // The actual logic is expressed in DFSClient#computePacketChunkSize - int bytesToSend = 700; - while (cnt < 100 && pipst.getSuspend()) { - if(LOG.isDebugEnabled()) { - LOG.debug("_06(): " + cnt++ + " sending another " + - bytesToSend + " bytes"); - } - TestPipelines.writeData(fsOut, bytesToSend); - } - } catch (Exception e) { - LOG.warn("Getting unexpected exception: ", e); - } - if(LOG.isDebugEnabled()) { - LOG.debug("Last queued packet number " + pipst.getLastQueued()); - } - assertTrue("Shouldn't be able to send more than 81 packet", pipst.getLastQueued() <= 81); - } - - private class QueueChecker extends Thread { - PipelinesTestUtil.PipelinesTest test; - final int MAX; - boolean done = false; - - public QueueChecker(PipelinesTestUtil.PipelinesTest handle, int maxPackets) { - test = handle; - MAX = maxPackets; - } - - @Override - public void run() { - while (!done) { - if(LOG.isDebugEnabled()) { - LOG.debug("_06: checking for the limit " + test.getLastQueued() + - " and " + MAX); - } - if (test.getLastQueued() >= MAX) { - if(LOG.isDebugEnabled()) { - LOG.debug("FI: Resume packets acking"); - } - test.setSuspend(false); //Do not suspend ack sending any more - done = true; - } - if (!done) - try { - if(LOG.isDebugEnabled()) { - LOG.debug("_06: MAX isn't reached yet. Current=" + - test.getLastQueued()); - } - sleep(100); - } catch (InterruptedException e) { } - } - - assertTrue("Shouldn't be able to send more than 81 packet", test.getLastQueued() <= 81); - try { - if(LOG.isDebugEnabled()) { - LOG.debug("_06: shutting down the cluster"); - } - // It has to be done like that, because local version of shutDownCluster() - // won't work, because it tries to close an instance of FileSystem too. - // Which is where the waiting is happening. - if (cluster !=null ) - shutDownCluster(); - } catch (Exception e) { - e.printStackTrace(); - } - if(LOG.isDebugEnabled()) { - LOG.debug("End QueueChecker thread"); - } - } - } - - private static void setConfiguration() { - conf = new Configuration(); - int customPerChecksumSize = 700; - int customBlockSize = customPerChecksumSize * 3; - conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 100); - conf.setInt(HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize); - conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize); - conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, customBlockSize / 2); - conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 0); - } - - private static void initLoggers() { - GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.ALL); - GenericTestUtils.setLogLevel(LogFactory.getLog(FSNamesystem.class), Level.ALL); - GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL); - GenericTestUtils.setLogLevel(TestFiPipelines.LOG, Level.ALL); - GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL); - GenericTestUtils.setLogLevel(FiTestUtil.LOG, Level.ALL); - GenericTestUtils.setLogLevel(BlockReceiverAspects.LOG, Level.ALL); - GenericTestUtils.setLogLevel(DFSClientAspects.LOG, Level.ALL); - } - -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a18336/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj b/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj deleted file mode 100644 index 5d258de..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fi.DataTransferTestUtil; -import org.apache.hadoop.fi.PipelineTest; - -/** Aspect for ClientProtocol */ -public aspect ClientProtocolAspects { - public static final Log LOG = LogFactory.getLog(ClientProtocolAspects.class); - - pointcut addBlock(): - call(LocatedBlock ClientProtocol.addBlock(String, String,..)); - - after() returning(LocatedBlock lb): addBlock() { - PipelineTest pipelineTest = DataTransferTestUtil.getPipelineTest(); - if (pipelineTest != null) - LOG.info("FI: addBlock " - + pipelineTest.initPipeline(lb)); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a18336/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj b/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj deleted file mode 100644 index 1b4fea3..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.datanode; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fi.DataTransferTestUtil; -import org.apache.hadoop.fi.Pipeline; -import org.apache.hadoop.fi.PipelineTest; -import org.apache.hadoop.fi.ProbabilityModel; -import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest; -import org.apache.hadoop.hdfs.server.datanode.BlockReceiver.PacketResponder; -import org.apache.hadoop.hdfs.PipelinesTestUtil.PipelinesTest; -import org.apache.hadoop.hdfs.PipelinesTestUtil.NodeBytes; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; -import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; -import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; - -/** - * This aspect takes care about faults injected into datanode.BlockReceiver - * class - */ -privileged public aspect BlockReceiverAspects { - public static final Log LOG = LogFactory.getLog(BlockReceiverAspects.class); - - BlockReceiver BlockReceiver.PacketResponder.getReceiver(){ - LOG.info("FI: getReceiver() " + getClass().getName()); - return BlockReceiver.this; - } - - pointcut callReceivePacket(BlockReceiver blockreceiver) : - call(* receivePacket(..)) && target(blockreceiver); - - before(BlockReceiver blockreceiver - ) throws IOException : callReceivePacket(blockreceiver) { - final String dnName = blockreceiver.getDataNode().getMachineName(); - final DatanodeID dnId = blockreceiver.getDataNode().getDatanodeId(); - LOG.info("FI: callReceivePacket, datanode=" + dnName); - DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest(); - if (dtTest != null) - dtTest.fiCallReceivePacket.run(dnId); - - if (ProbabilityModel.injectCriteria(BlockReceiver.class.getSimpleName())) { - LOG.info("Before the injection point"); - Thread.dumpStack(); - throw new DiskOutOfSpaceException ("FI: injected fault point at " + - thisJoinPoint.getStaticPart( ).getSourceLocation()); - } - } - - pointcut callWritePacketToDisk(BlockReceiver blockreceiver) : - call(* writePacketToDisk(..)) && target(blockreceiver); - - before(BlockReceiver blockreceiver - ) throws IOException : callWritePacketToDisk(blockreceiver) { - LOG.info("FI: callWritePacketToDisk"); - DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest(); - if (dtTest != null) - dtTest.fiCallWritePacketToDisk.run( - blockreceiver.getDataNode().getDatanodeId()); - } - - pointcut afterDownstreamStatusRead(BlockReceiver.PacketResponder responder): - call(void PipelineAck.readFields(InputStream)) && this(responder); - - after(BlockReceiver.PacketResponder responder) - throws IOException: afterDownstreamStatusRead(responder) { - final DataNode d = responder.getReceiver().getDataNode(); - DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest(); - if (dtTest != null) - dtTest.fiAfterDownstreamStatusRead.run(d.getDatanodeId()); - } - - // Pointcuts and advises for TestFiPipelines - pointcut callSetNumBytes(BlockReceiver br, long offset) : - call (void ReplicaInPipelineInterface.setNumBytes(long)) - && withincode (int BlockReceiver.receivePacket(long, long, boolean, int, int)) - && args(offset) - && this(br); - - after(BlockReceiver br, long offset) : callSetNumBytes(br, offset) { - LOG.debug("FI: Received bytes To: " + br.datanode.getStorageId() + ": " + offset); - PipelineTest pTest = DataTransferTestUtil.getDataTransferTest(); - if (pTest == null) { - LOG.debug("FI: no pipeline has been found in receiving"); - return; - } - if (!(pTest instanceof PipelinesTest)) { - return; - } - NodeBytes nb = new NodeBytes(br.datanode.getDatanodeId(), offset); - try { - ((PipelinesTest)pTest).fiCallSetNumBytes.run(nb); - } catch (IOException e) { - LOG.fatal("FI: no exception is expected here!"); - } - } - - // Pointcuts and advises for TestFiPipelines - pointcut callSetBytesAcked(PacketResponder pr, long acked) : - call (void ReplicaInPipelineInterface.setBytesAcked(long)) - && withincode (void PacketResponder.run()) - && args(acked) - && this(pr); - - after (PacketResponder pr, long acked) : callSetBytesAcked (pr, acked) { - PipelineTest pTest = DataTransferTestUtil.getDataTransferTest(); - if (pTest == null) { - LOG.debug("FI: no pipeline has been found in acking"); - return; - } - LOG.debug("FI: Acked total bytes from: " + - pr.getReceiver().datanode.getStorageId() + ": " + acked); - if (pTest instanceof PipelinesTest) { - bytesAckedService((PipelinesTest)pTest, pr, acked); - } - } - - private void bytesAckedService - (final PipelinesTest pTest, final PacketResponder pr, final long acked) { - NodeBytes nb = new NodeBytes(pr.getReceiver().datanode.getDatanodeId(), acked); - try { - pTest.fiCallSetBytesAcked.run(nb); - } catch (IOException e) { - LOG.fatal("No exception should be happening at this point"); - assert false; - } - } - - pointcut preventAckSending () : - call (void PipelineAck.write(OutputStream)) - && within (PacketResponder); - - static int ackCounter = 0; - void around () : preventAckSending () { - PipelineTest pTest = DataTransferTestUtil.getDataTransferTest(); - - if (pTest == null) { - LOG.debug("FI: remove first ack as expected"); - proceed(); - return; - } - if (!(pTest instanceof PipelinesTest)) { - LOG.debug("FI: remove first ack as expected"); - proceed(); - return; - } - if (((PipelinesTest)pTest).getSuspend()) { - LOG.debug("FI: suspend the ack"); - return; - } - LOG.debug("FI: remove first ack as expected"); - proceed(); - } - // End of pointcuts and advises for TestFiPipelines - - pointcut pipelineClose(BlockReceiver blockreceiver, long offsetInBlock, long seqno, - boolean lastPacketInBlock, int len, int endOfHeader) : - call (* BlockReceiver.receivePacket(long, long, boolean, int, int)) - && this(blockreceiver) - && args(offsetInBlock, seqno, lastPacketInBlock, len, endOfHeader); - - before(BlockReceiver blockreceiver, long offsetInBlock, long seqno, - boolean lastPacketInBlock, int len, int endOfHeader - ) throws IOException : pipelineClose(blockreceiver, offsetInBlock, seqno, - lastPacketInBlock, len, endOfHeader) { - if (len == 0) { - final DatanodeID dnId = blockreceiver.getDataNode().getDatanodeId(); - LOG.info("FI: pipelineClose, datanode=" + dnId.getName() - + ", offsetInBlock=" + offsetInBlock - + ", seqno=" + seqno - + ", lastPacketInBlock=" + lastPacketInBlock - + ", len=" + len - + ", endOfHeader=" + endOfHeader); - - final DataTransferTest test = DataTransferTestUtil.getDataTransferTest(); - if (test != null) { - test.fiPipelineClose.run(dnId); - } - } - } - - pointcut pipelineAck(BlockReceiver.PacketResponder packetresponder) : - call (void PipelineAck.readFields(InputStream)) - && this(packetresponder); - - after(BlockReceiver.PacketResponder packetresponder) throws IOException - : pipelineAck(packetresponder) { - final DatanodeID dnId = packetresponder.getReceiver().getDataNode().getDatanodeId(); - LOG.info("FI: fiPipelineAck, datanode=" + dnId); - - final DataTransferTest test = DataTransferTestUtil.getDataTransferTest(); - if (test != null) { - test.fiPipelineAck.run(dnId); - } - } - - pointcut blockFileClose(BlockReceiver blockreceiver) : - call(void close()) - && withincode(void BlockReceiver.close()) - && this(blockreceiver); - - after(BlockReceiver blockreceiver) throws IOException : blockFileClose(blockreceiver) { - final DatanodeID dnId = blockreceiver.getDataNode().getDatanodeId(); - LOG.info("FI: blockFileClose, datanode=" + dnId); - - final DataTransferTest test = DataTransferTestUtil.getDataTransferTest(); - if (test != null) { - test.fiBlockFileClose.run(dnId); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a18336/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj b/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj deleted file mode 100644 index 2401d08..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.datanode; - -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.InputStream; -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fi.DataTransferTestUtil; -import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocol.datatransfer.Op; -import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver; - -/** Aspect for DataTransferProtocol */ -public aspect DataTransferProtocolAspects { - public static final Log LOG = LogFactory.getLog( - DataTransferProtocolAspects.class); - /* - { - ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL); - ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); - ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); - ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); - ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); - ((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL); - } - */ - - pointcut receiverOp(DataXceiver dataxceiver): - call(Op Receiver.readOp()) && target(dataxceiver); - - after(DataXceiver dataxceiver) returning(Op op): receiverOp(dataxceiver) { - LOG.info("FI: receiverOp " + op + ", datanode=" - + dataxceiver.getDataNode().getDatanodeId().getName()); - } - - pointcut statusRead(DataXceiver dataxceiver): - call(BlockOpResponseProto BlockOpResponseProto.parseFrom(InputStream)) && this(dataxceiver); - - after(DataXceiver dataxceiver) returning(BlockOpResponseProto status - ) throws IOException: statusRead(dataxceiver) { - final DataNode d = dataxceiver.getDataNode(); - LOG.info("FI: statusRead " + status + ", datanode=" - + d.getDatanodeId().getName()); - DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest(); - if (dtTest != null) - dtTest.fiStatusRead.run(d.getDatanodeId()); - } - - pointcut receiverOpWriteBlock(DataXceiver dataxceiver): - call(void Receiver.opWriteBlock(DataInputStream)) && target(dataxceiver); - - before(DataXceiver dataxceiver - ) throws IOException: receiverOpWriteBlock(dataxceiver) { - LOG.info("FI: receiverOpWriteBlock"); - DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest(); - if (dtTest != null) - dtTest.fiReceiverOpWriteBlock.run( - dataxceiver.getDataNode().getDatanodeId()); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a18336/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/FSDatasetAspects.aj ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/FSDatasetAspects.aj b/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/FSDatasetAspects.aj deleted file mode 100644 index 8f9b7b8..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/FSDatasetAspects.aj +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.datanode; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fi.ProbabilityModel; -import org.apache.hadoop.hdfs.server.datanode.FSDataset; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams; -import org.apache.hadoop.util.DiskChecker.*; - - -/** - * This aspect takes care about faults injected into datanode.FSDatase class - */ -public aspect FSDatasetAspects { - public static final Log LOG = LogFactory.getLog(BlockReceiverAspects.class); - - pointcut execGetBlockFile() : - // the following will inject faults inside of the method in question - execution (* FSDataset.getBlockFile(..)) && !within(FSDatasetAspects +); - - pointcut callCreateBlockWriteStream(ReplicaInPipeline repl) : - call (BlockWriteStreams createStreams(..)) - && target (repl) - && !within(FSDatasetAspects +); - - // This aspect specifies the logic of our fault point. - // In this case it simply throws DiskErrorException at the very beginning of - // invocation of the method, specified by callGetBlockFile() pointcut - before() throws DiskErrorException : execGetBlockFile() { - if (ProbabilityModel.injectCriteria(FSDataset.class.getSimpleName())) { - LOG.info("Before the injection point"); - Thread.dumpStack(); - throw new DiskErrorException("FI: injected fault point at " - + thisJoinPoint.getStaticPart().getSourceLocation()); - } - } - - before(ReplicaInPipeline repl) throws DiskOutOfSpaceException : callCreateBlockWriteStream(repl) { - if (ProbabilityModel.injectCriteria(FSDataset.class.getSimpleName())) { - LOG.info("Before the injection point"); - Thread.dumpStack(); - throw new DiskOutOfSpaceException("FI: injected fault point at " - + thisJoinPoint.getStaticPart().getSourceLocation()); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a18336/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java deleted file mode 100644 index e2b8aef..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java +++ /dev/null @@ -1,312 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.datanode; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fi.DataTransferTestUtil; -import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest; -import org.apache.hadoop.fi.DataTransferTestUtil.DoosAction; -import org.apache.hadoop.fi.DataTransferTestUtil.OomAction; -import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction; -import org.apache.hadoop.fi.DataTransferTestUtil.VerificationAction; -import org.apache.hadoop.fi.FiTestUtil; -import org.apache.hadoop.fi.FiTestUtil.Action; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.log4j.Level; -import org.junit.Assert; -import org.junit.Test; - -/** Test DataTransferProtocol with fault injection. */ -public class TestFiDataTransferProtocol { - static final short REPLICATION = 3; - static final long BLOCKSIZE = 1L * (1L << 20); - - static final Configuration conf = new HdfsConfiguration(); - static { - conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 1); - conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION); - conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000); - } - - static private FSDataOutputStream createFile(FileSystem fs, Path p - ) throws IOException { - return fs.create(p, true, - fs.getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, - 4096), REPLICATION, BLOCKSIZE); - } - - { - GenericTestUtils.setLogLevel(DataTransferProtocol.LOG, Level.ALL); - } - - /** - * 1. create files with dfs - * 2. write 1 byte - * 3. close file - * 4. open the same file - * 5. read the 1 byte and compare results - */ - static void write1byte(String methodName) throws IOException { - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf - ).numDataNodes(REPLICATION + 1).build(); - final FileSystem dfs = cluster.getFileSystem(); - try { - final Path p = new Path("/" + methodName + "/foo"); - final FSDataOutputStream out = createFile(dfs, p); - out.write(1); - out.close(); - - final FSDataInputStream in = dfs.open(p); - final int b = in.read(); - in.close(); - Assert.assertEquals(1, b); - } - finally { - dfs.close(); - cluster.shutdown(); - } - } - - private static void runSlowDatanodeTest(String methodName, SleepAction a - ) throws IOException { - FiTestUtil.LOG.info("Running " + methodName + " ..."); - final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest(); - t.fiCallReceivePacket.set(a); - t.fiReceiverOpWriteBlock.set(a); - t.fiStatusRead.set(a); - write1byte(methodName); - } - - private static void runReceiverOpWriteBlockTest(String methodName, - int errorIndex, Action<DatanodeID, IOException> a) throws IOException { - FiTestUtil.LOG.info("Running " + methodName + " ..."); - final DataTransferTest t = (DataTransferTest) DataTransferTestUtil - .initTest(); - t.fiReceiverOpWriteBlock.set(a); - t.fiPipelineInitErrorNonAppend.set(new VerificationAction(methodName, - errorIndex)); - write1byte(methodName); - Assert.assertTrue(t.isSuccess()); - } - - private static void runStatusReadTest(String methodName, int errorIndex, - Action<DatanodeID, IOException> a) throws IOException { - FiTestUtil.LOG.info("Running " + methodName + " ..."); - final DataTransferTest t = (DataTransferTest) DataTransferTestUtil - .initTest(); - t.fiStatusRead.set(a); - t.fiPipelineInitErrorNonAppend.set(new VerificationAction(methodName, - errorIndex)); - write1byte(methodName); - Assert.assertTrue(t.isSuccess()); - } - - private static void runCallWritePacketToDisk(String methodName, - int errorIndex, Action<DatanodeID, IOException> a) throws IOException { - FiTestUtil.LOG.info("Running " + methodName + " ..."); - final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest(); - t.fiCallWritePacketToDisk.set(a); - t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, errorIndex)); - write1byte(methodName); - Assert.assertTrue(t.isSuccess()); - } - - /** - * Pipeline setup: - * DN0 never responses after received setup request from client. - * Client gets an IOException and determine DN0 bad. - */ - @Test - public void pipeline_Fi_01() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runReceiverOpWriteBlockTest(methodName, 0, new SleepAction(methodName, 0, 0)); - } - - /** - * Pipeline setup: - * DN1 never responses after received setup request from client. - * Client gets an IOException and determine DN1 bad. - */ - @Test - public void pipeline_Fi_02() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runReceiverOpWriteBlockTest(methodName, 1, new SleepAction(methodName, 1, 0)); - } - - /** - * Pipeline setup: - * DN2 never responses after received setup request from client. - * Client gets an IOException and determine DN2 bad. - */ - @Test - public void pipeline_Fi_03() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runReceiverOpWriteBlockTest(methodName, 2, new SleepAction(methodName, 2, 0)); - } - - /** - * Pipeline setup, DN1 never responses after received setup ack from DN2. - * Client gets an IOException and determine DN1 bad. - */ - @Test - public void pipeline_Fi_04() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runStatusReadTest(methodName, 1, new SleepAction(methodName, 1, 0)); - } - - /** - * Pipeline setup, DN0 never responses after received setup ack from DN1. - * Client gets an IOException and determine DN0 bad. - */ - @Test - public void pipeline_Fi_05() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runStatusReadTest(methodName, 0, new SleepAction(methodName, 0, 0)); - } - - /** - * Pipeline setup with DN0 very slow but it won't lead to timeout. - * Client finishes setup successfully. - */ - @Test - public void pipeline_Fi_06() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runSlowDatanodeTest(methodName, new SleepAction(methodName, 0, 3000)); - } - - /** - * Pipeline setup with DN1 very slow but it won't lead to timeout. - * Client finishes setup successfully. - */ - @Test - public void pipeline_Fi_07() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runSlowDatanodeTest(methodName, new SleepAction(methodName, 1, 3000)); - } - - /** - * Pipeline setup with DN2 very slow but it won't lead to timeout. - * Client finishes setup successfully. - */ - @Test - public void pipeline_Fi_08() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runSlowDatanodeTest(methodName, new SleepAction(methodName, 2, 3000)); - } - - /** - * Pipeline setup, DN0 throws an OutOfMemoryException right after it - * received a setup request from client. - * Client gets an IOException and determine DN0 bad. - */ - @Test - public void pipeline_Fi_09() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runReceiverOpWriteBlockTest(methodName, 0, new OomAction(methodName, 0)); - } - - /** - * Pipeline setup, DN1 throws an OutOfMemoryException right after it - * received a setup request from DN0. - * Client gets an IOException and determine DN1 bad. - */ - @Test - public void pipeline_Fi_10() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runReceiverOpWriteBlockTest(methodName, 1, new OomAction(methodName, 1)); - } - - /** - * Pipeline setup, DN2 throws an OutOfMemoryException right after it - * received a setup request from DN1. - * Client gets an IOException and determine DN2 bad. - */ - @Test - public void pipeline_Fi_11() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runReceiverOpWriteBlockTest(methodName, 2, new OomAction(methodName, 2)); - } - - /** - * Pipeline setup, DN1 throws an OutOfMemoryException right after it - * received a setup ack from DN2. - * Client gets an IOException and determine DN1 bad. - */ - @Test - public void pipeline_Fi_12() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runStatusReadTest(methodName, 1, new OomAction(methodName, 1)); - } - - /** - * Pipeline setup, DN0 throws an OutOfMemoryException right after it - * received a setup ack from DN1. - * Client gets an IOException and determine DN0 bad. - */ - @Test - public void pipeline_Fi_13() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runStatusReadTest(methodName, 0, new OomAction(methodName, 0)); - } - - /** - * Streaming: Write a packet, DN0 throws a DiskOutOfSpaceError - * when it writes the data to disk. - * Client gets an IOException and determine DN0 bad. - */ - @Test - public void pipeline_Fi_14() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runCallWritePacketToDisk(methodName, 0, new DoosAction(methodName, 0)); - } - - /** - * Streaming: Write a packet, DN1 throws a DiskOutOfSpaceError - * when it writes the data to disk. - * Client gets an IOException and determine DN1 bad. - */ - @Test - public void pipeline_Fi_15() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runCallWritePacketToDisk(methodName, 1, new DoosAction(methodName, 1)); - } - - /** - * Streaming: Write a packet, DN2 throws a DiskOutOfSpaceError - * when it writes the data to disk. - * Client gets an IOException and determine DN2 bad. - */ - @Test - public void pipeline_Fi_16() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runCallWritePacketToDisk(methodName, 2, new DoosAction(methodName, 2)); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a18336/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java deleted file mode 100644 index 52cd9c1..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java +++ /dev/null @@ -1,289 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.datanode; - -import java.io.IOException; -import java.util.Random; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fi.DataTransferTestUtil; -import org.apache.hadoop.fi.DataTransferTestUtil.CountdownDoosAction; -import org.apache.hadoop.fi.DataTransferTestUtil.CountdownOomAction; -import org.apache.hadoop.fi.DataTransferTestUtil.CountdownSleepAction; -import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest; -import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction; -import org.apache.hadoop.fi.DataTransferTestUtil.VerificationAction; -import org.apache.hadoop.fi.FiTestUtil; -import static org.apache.hadoop.fs.CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSClient; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.log4j.Level; -import org.junit.Assert; -import org.junit.Test; - -/** Test DataTransferProtocol with fault injection. */ -public class TestFiDataTransferProtocol2 { - static final short REPLICATION = 3; - static final long BLOCKSIZE = 1L * (1L << 20); - static final int PACKET_SIZE = 1024; - static final int MIN_N_PACKET = 3; - static final int MAX_N_PACKET = 10; - - static final int MAX_SLEEP = 1000; - - static final Configuration conf = new Configuration(); - static { - conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 1); - conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION); - conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, PACKET_SIZE); - conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000); - } - - static final byte[] bytes = new byte[MAX_N_PACKET * PACKET_SIZE]; - static final byte[] toRead = new byte[MAX_N_PACKET * PACKET_SIZE]; - - static private FSDataOutputStream createFile(FileSystem fs, Path p - ) throws IOException { - return fs.create(p, true, fs.getConf() - .getInt(IO_FILE_BUFFER_SIZE_KEY, 4096), REPLICATION, BLOCKSIZE); - } - - { - GenericTestUtils.setLogLevel(BlockReceiver.LOG, Level.ALL); - GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL); - GenericTestUtils.setLogLevel(DataTransferProtocol.LOG, Level.ALL); - } - /** - * 1. create files with dfs - * 2. write MIN_N_PACKET to MAX_N_PACKET packets - * 3. close file - * 4. open the same file - * 5. read the bytes and compare results - */ - private static void writeSeveralPackets(String methodName) throws IOException { - final Random r = FiTestUtil.RANDOM.get(); - final int nPackets = FiTestUtil.nextRandomInt(MIN_N_PACKET, MAX_N_PACKET + 1); - final int lastPacketSize = FiTestUtil.nextRandomInt(1, PACKET_SIZE + 1); - final int size = (nPackets - 1)*PACKET_SIZE + lastPacketSize; - - FiTestUtil.LOG.info("size=" + size + ", nPackets=" + nPackets - + ", lastPacketSize=" + lastPacketSize); - - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf - ).numDataNodes(REPLICATION + 2).build(); - final FileSystem dfs = cluster.getFileSystem(); - try { - final Path p = new Path("/" + methodName + "/foo"); - final FSDataOutputStream out = createFile(dfs, p); - - final long seed = r.nextLong(); - final Random ran = new Random(seed); - ran.nextBytes(bytes); - out.write(bytes, 0, size); - out.close(); - - final FSDataInputStream in = dfs.open(p); - int totalRead = 0; - int nRead = 0; - while ((nRead = in.read(toRead, totalRead, size - totalRead)) > 0) { - totalRead += nRead; - } - Assert.assertEquals("Cannot read file.", size, totalRead); - for (int i = 0; i < size; i++) { - Assert.assertTrue("File content differ.", bytes[i] == toRead[i]); - } - } - finally { - dfs.close(); - cluster.shutdown(); - } - } - - private static void initSlowDatanodeTest(DataTransferTest t, SleepAction a) - throws IOException { - t.fiCallReceivePacket.set(a); - t.fiReceiverOpWriteBlock.set(a); - t.fiStatusRead.set(a); - } - - private void runTest17_19(String methodName, int dnIndex) - throws IOException { - FiTestUtil.LOG.info("Running " + methodName + " ..."); - final DataTransferTest t = (DataTransferTest) DataTransferTestUtil - .initTest(); - initSlowDatanodeTest(t, new SleepAction(methodName, 0, 0, MAX_SLEEP)); - initSlowDatanodeTest(t, new SleepAction(methodName, 1, 0, MAX_SLEEP)); - initSlowDatanodeTest(t, new SleepAction(methodName, 2, 0, MAX_SLEEP)); - t.fiCallWritePacketToDisk.set(new CountdownDoosAction(methodName, dnIndex, 3)); - t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex)); - writeSeveralPackets(methodName); - Assert.assertTrue(t.isSuccess()); - } - - private void runTest29_30(String methodName, int dnIndex) throws IOException { - FiTestUtil.LOG.info("Running " + methodName + " ..."); - final DataTransferTest t = (DataTransferTest) DataTransferTestUtil - .initTest(); - initSlowDatanodeTest(t, new SleepAction(methodName, 0, 0, MAX_SLEEP)); - initSlowDatanodeTest(t, new SleepAction(methodName, 1, 0, MAX_SLEEP)); - initSlowDatanodeTest(t, new SleepAction(methodName, 2, 0, MAX_SLEEP)); - t.fiAfterDownstreamStatusRead.set(new CountdownOomAction(methodName, dnIndex, 3)); - t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex)); - writeSeveralPackets(methodName); - Assert.assertTrue(t.isSuccess()); - } - - private void runTest34_35(String methodName, int dnIndex) throws IOException { - FiTestUtil.LOG.info("Running " + methodName + " ..."); - final DataTransferTest t = (DataTransferTest) DataTransferTestUtil - .initTest(); - t.fiAfterDownstreamStatusRead.set(new CountdownSleepAction(methodName, dnIndex, 0, 3)); - t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex)); - writeSeveralPackets(methodName); - Assert.assertTrue(t.isSuccess()); - } - /** - * Streaming: - * Randomize datanode speed, write several packets, - * DN0 throws a DiskOutOfSpaceError when it writes the third packet to disk. - * Client gets an IOException and determines DN0 bad. - */ - @Test - public void pipeline_Fi_17() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runTest17_19(methodName, 0); - } - - /** - * Streaming: - * Randomize datanode speed, write several packets, - * DN1 throws a DiskOutOfSpaceError when it writes the third packet to disk. - * Client gets an IOException and determines DN1 bad. - */ - @Test - public void pipeline_Fi_18() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runTest17_19(methodName, 1); - } - - /** - * Streaming: - * Randomize datanode speed, write several packets, - * DN2 throws a DiskOutOfSpaceError when it writes the third packet to disk. - * Client gets an IOException and determines DN2 bad. - */ - @Test - public void pipeline_Fi_19() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runTest17_19(methodName, 2); - } - - /** - * Streaming: Client writes several packets with DN0 very slow. Client - * finishes write successfully. - */ - @Test - public void pipeline_Fi_20() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - FiTestUtil.LOG.info("Running " + methodName + " ..."); - final DataTransferTest t = (DataTransferTest) DataTransferTestUtil - .initTest(); - initSlowDatanodeTest(t, new SleepAction(methodName, 0, MAX_SLEEP)); - writeSeveralPackets(methodName); - } - - /** - * Streaming: Client writes several packets with DN1 very slow. Client - * finishes write successfully. - */ - @Test - public void pipeline_Fi_21() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - FiTestUtil.LOG.info("Running " + methodName + " ..."); - final DataTransferTest t = (DataTransferTest) DataTransferTestUtil - .initTest(); - initSlowDatanodeTest(t, new SleepAction(methodName, 1, MAX_SLEEP)); - writeSeveralPackets(methodName); - } - - /** - * Streaming: Client writes several packets with DN2 very slow. Client - * finishes write successfully. - */ - @Test - public void pipeline_Fi_22() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - FiTestUtil.LOG.info("Running " + methodName + " ..."); - final DataTransferTest t = (DataTransferTest) DataTransferTestUtil - .initTest(); - initSlowDatanodeTest(t, new SleepAction(methodName, 2, MAX_SLEEP)); - writeSeveralPackets(methodName); - } - - /** - * Streaming: Randomize datanode speed, write several packets, DN1 throws a - * OutOfMemoryException when it receives the ack of the third packet from DN2. - * Client gets an IOException and determines DN1 bad. - */ - @Test - public void pipeline_Fi_29() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runTest29_30(methodName, 1); - } - - /** - * Streaming: Randomize datanode speed, write several packets, DN0 throws a - * OutOfMemoryException when it receives the ack of the third packet from DN1. - * Client gets an IOException and determines DN0 bad. - */ - @Test - public void pipeline_Fi_30() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runTest29_30(methodName, 0); - } - - /** - * Streaming: Write several packets, DN1 never responses when it receives the - * ack of the third packet from DN2. Client gets an IOException and determines - * DN1 bad. - */ - @Test - public void pipeline_Fi_34() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runTest34_35(methodName, 1); - } - - /** - * Streaming: Write several packets, DN0 never responses when it receives the - * ack of the third packet from DN1. Client gets an IOException and determines - * DN0 bad. - */ - @Test - public void pipeline_Fi_35() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runTest34_35(methodName, 0); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a18336/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java deleted file mode 100644 index 1468222..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.datanode; - -import java.io.IOException; - -import org.apache.hadoop.fi.DataTransferTestUtil; -import org.apache.hadoop.fi.DataTransferTestUtil.DataNodeAction; -import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest; -import org.apache.hadoop.fi.DataTransferTestUtil.DatanodeMarkingAction; -import org.apache.hadoop.fi.DataTransferTestUtil.IoeAction; -import org.apache.hadoop.fi.DataTransferTestUtil.OomAction; -import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction; -import org.apache.hadoop.fi.FiTestUtil; -import org.apache.hadoop.fi.FiTestUtil.Action; -import org.apache.hadoop.fi.FiTestUtil.ConstraintSatisfactionAction; -import org.apache.hadoop.fi.FiTestUtil.MarkerConstraint; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.junit.Test; - -/** Test DataTransferProtocol with fault injection. */ -public class TestFiPipelineClose { - private static void runPipelineCloseTest(String methodName, - Action<DatanodeID, IOException> a) throws IOException { - FiTestUtil.LOG.info("Running " + methodName + " ..."); - final DataTransferTest t = (DataTransferTest) DataTransferTestUtil - .initTest(); - t.fiPipelineClose.set(a); - TestFiDataTransferProtocol.write1byte(methodName); - } - - /** - * Pipeline close: - * DN0 never responses after received close request from client. - * Client gets an IOException and determine DN0 bad. - */ - @Test - public void pipeline_Fi_36() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runPipelineCloseTest(methodName, new SleepAction(methodName, 0, 0)); - } - - /** - * Pipeline close: - * DN1 never responses after received close request from client. - * Client gets an IOException and determine DN1 bad. - */ - @Test - public void pipeline_Fi_37() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runPipelineCloseTest(methodName, new SleepAction(methodName, 1, 0)); - } - - /** - * Pipeline close: - * DN2 never responses after received close request from client. - * Client gets an IOException and determine DN2 bad. - */ - @Test - public void pipeline_Fi_38() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runPipelineCloseTest(methodName, new SleepAction(methodName, 2, 0)); - } - - private static void run41_43(String name, int i) throws IOException { - runPipelineCloseTest(name, new SleepAction(name, i, 3000)); - } - - private static void runPipelineCloseAck(String name, int i, DataNodeAction a - ) throws IOException { - FiTestUtil.LOG.info("Running " + name + " ..."); - final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest(); - final MarkerConstraint marker = new MarkerConstraint(name); - t.fiPipelineClose.set(new DatanodeMarkingAction(name, i, marker)); - t.fiPipelineAck.set(new ConstraintSatisfactionAction<DatanodeID, IOException>(a, marker)); - TestFiDataTransferProtocol.write1byte(name); - } - - private static void run39_40(String name, int i) throws IOException { - runPipelineCloseAck(name, i, new SleepAction(name, i, 0)); - } - - /** - * Pipeline close: - * DN1 never responses after received close ack DN2. - * Client gets an IOException and determine DN1 bad. - */ - @Test - public void pipeline_Fi_39() throws IOException { - run39_40(FiTestUtil.getMethodName(), 1); - } - - /** - * Pipeline close: - * DN0 never responses after received close ack DN1. - * Client gets an IOException and determine DN0 bad. - */ - @Test - public void pipeline_Fi_40() throws IOException { - run39_40(FiTestUtil.getMethodName(), 0); - } - - /** - * Pipeline close with DN0 very slow but it won't lead to timeout. - * Client finishes close successfully. - */ - @Test - public void pipeline_Fi_41() throws IOException { - run41_43(FiTestUtil.getMethodName(), 0); - } - - /** - * Pipeline close with DN1 very slow but it won't lead to timeout. - * Client finishes close successfully. - */ - @Test - public void pipeline_Fi_42() throws IOException { - run41_43(FiTestUtil.getMethodName(), 1); - } - - /** - * Pipeline close with DN2 very slow but it won't lead to timeout. - * Client finishes close successfully. - */ - @Test - public void pipeline_Fi_43() throws IOException { - run41_43(FiTestUtil.getMethodName(), 2); - } - - /** - * Pipeline close: - * DN0 throws an OutOfMemoryException - * right after it received a close request from client. - * Client gets an IOException and determine DN0 bad. - */ - @Test - public void pipeline_Fi_44() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runPipelineCloseTest(methodName, new OomAction(methodName, 0)); - } - - /** - * Pipeline close: - * DN1 throws an OutOfMemoryException - * right after it received a close request from client. - * Client gets an IOException and determine DN1 bad. - */ - @Test - public void pipeline_Fi_45() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runPipelineCloseTest(methodName, new OomAction(methodName, 1)); - } - - /** - * Pipeline close: - * DN2 throws an OutOfMemoryException - * right after it received a close request from client. - * Client gets an IOException and determine DN2 bad. - */ - @Test - public void pipeline_Fi_46() throws IOException { - final String methodName = FiTestUtil.getMethodName(); - runPipelineCloseTest(methodName, new OomAction(methodName, 2)); - } - - private static void run47_48(String name, int i) throws IOException { - runPipelineCloseAck(name, i, new OomAction(name, i)); - } - - /** - * Pipeline close: - * DN1 throws an OutOfMemoryException right after - * it received a close ack from DN2. - * Client gets an IOException and determine DN1 bad. - */ - @Test - public void pipeline_Fi_47() throws IOException { - run47_48(FiTestUtil.getMethodName(), 1); - } - - /** - * Pipeline close: - * DN0 throws an OutOfMemoryException right after - * it received a close ack from DN1. - * Client gets an IOException and determine DN0 bad. - */ - @Test - public void pipeline_Fi_48() throws IOException { - run47_48(FiTestUtil.getMethodName(), 0); - } - - private static void runBlockFileCloseTest(String methodName, - Action<DatanodeID, IOException> a) throws IOException { - FiTestUtil.LOG.info("Running " + methodName + " ..."); - final DataTransferTest t = (DataTransferTest) DataTransferTestUtil - .initTest(); - t.fiBlockFileClose.set(a); - TestFiDataTransferProtocol.write1byte(methodName); - } - - private static void run49_51(String name, int i) throws IOException { - runBlockFileCloseTest(name, new IoeAction(name, i, "DISK ERROR")); - } - - /** - * Pipeline close: - * DN0 throws a disk error exception when it is closing the block file. - * Client gets an IOException and determine DN0 bad. - */ - @Test - public void pipeline_Fi_49() throws IOException { - run49_51(FiTestUtil.getMethodName(), 0); - } - - - /** - * Pipeline close: - * DN1 throws a disk error exception when it is closing the block file. - * Client gets an IOException and determine DN1 bad. - */ - @Test - public void pipeline_Fi_50() throws IOException { - run49_51(FiTestUtil.getMethodName(), 1); - } - - /** - * Pipeline close: - * DN2 throws a disk error exception when it is closing the block file. - * Client gets an IOException and determine DN2 bad. - */ - @Test - public void pipeline_Fi_51() throws IOException { - run49_51(FiTestUtil.getMethodName(), 2); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a18336/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/namenode/FileDataServletAspects.aj ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/namenode/FileDataServletAspects.aj b/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/namenode/FileDataServletAspects.aj deleted file mode 100644 index 6e74bca..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/namenode/FileDataServletAspects.aj +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode; - -import java.net.URL; -import java.io.IOException; - -import javax.servlet.http.HttpServletRequest; - -import org.apache.commons.logging.Log; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -import org.apache.hadoop.security.UserGroupInformation; - -public aspect FileDataServletAspects { - static final Log LOG = FileDataServlet.LOG; - - pointcut callCreateUrl() : call (URL FileDataServlet.createRedirectURL( - String, String, HdfsFileStatus, UserGroupInformation, ClientProtocol, - HttpServletRequest, String)); - - /** Replace host name with "localhost" for unit test environment. */ - URL around () throws IOException : callCreateUrl() { - final URL original = proceed(); - LOG.info("FI: original url = " + original); - final URL replaced = new URL("http", "localhost", original.getPort(), - original.getPath() + '?' + original.getQuery()); - LOG.info("FI: replaced url = " + replaced); - return replaced; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a18336/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/namenode/ListPathAspects.aj ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/namenode/ListPathAspects.aj b/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/namenode/ListPathAspects.aj deleted file mode 100644 index 223ef26..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/namenode/ListPathAspects.aj +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode; - -import java.io.IOException; - -import org.apache.commons.logging.*; -import org.apache.hadoop.fs.UnresolvedLinkException; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.DirectoryListing; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; - -/** - * The aspects here are used for testing HDFS implementation of iterative - * directory listing functionality. A directory is deleted right after - * the first listPath RPC. - */ -public privileged aspect ListPathAspects { - public static final Log LOG = LogFactory.getLog(ListPathAspects.class); - - pointcut callGetListing(FSNamesystem fd, String src, - byte[] startAfter, boolean needLocation) : - call(DirectoryListing FSNamesystem.getListing(String, byte[], boolean)) - && target(fd) - && args(src, startAfter, needLocation); - - after(FSNamesystem fd, String src, byte[] startAfter, boolean needLocation) - throws IOException, UnresolvedLinkException: - callGetListing(fd, src, startAfter, needLocation) { - LOG.info("FI: callGetListing"); - fd.delete(src, true); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a18336/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/namenode/RenameAspects.aj ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/namenode/RenameAspects.aj b/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/namenode/RenameAspects.aj deleted file mode 100644 index cc64053..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/aop/org/apache/hadoop/hdfs/server/namenode/RenameAspects.aj +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode; - -import org.apache.commons.logging.*; -import org.apache.hadoop.hdfs.server.namenode.INode; -import org.apache.hadoop.fs.Options.Rename; -import org.apache.hadoop.fs.TestFiRename; -import org.apache.hadoop.hdfs.protocol.QuotaExceededException; - -/** - * The aspects here are used for testing HDFS implementation of rename - * functionality. Failure is introduced during rename to test the atomicity of - * rename. - */ -public privileged aspect RenameAspects { - public static final Log LOG = LogFactory.getLog(RenameAspects.class); - - /** When removeChild is called during rename, throw exception */ - pointcut callRemove(INode[] inodes, int pos) : - call(* FSDirectory.removeChild(INode[], int)) - && args(inodes, pos) - && withincode (* FSDirectory.unprotectedRenameTo(String, - String, long, Rename...)); - - before(INode[] inodes, int pos) throws RuntimeException : - callRemove(inodes, pos) { - LOG.info("FI: callRenameRemove"); - if (TestFiRename.throwExceptionOnRemove(inodes[pos].getLocalName())) { - throw new RuntimeException("RenameAspects - on remove " + - inodes[pos].getLocalName()); - } - } - - /** When addChildNoQuotaCheck is called during rename, throw exception */ - pointcut callAddChildNoQuotaCheck(INode[] inodes, int pos, INode node, long diskspace, boolean flag) : - call(* FSDirectory.addChildNoQuotaCheck(INode[], int, INode, long, boolean)) - && args(inodes, pos, node, diskspace, flag) - && withincode (* FSDirectory.unprotectedRenameTo(String, - String, long, Rename...)); - - before(INode[] inodes, int pos, INode node, long diskspace, boolean flag) - throws RuntimeException : - callAddChildNoQuotaCheck(inodes, pos, node, diskspace, flag) { - LOG.info("FI: callAddChildNoQuotaCheck"); - if (TestFiRename.throwExceptionOnAdd(inodes[pos].getLocalName())) { - throw new RuntimeException("RenameAspects on add " + - inodes[pos].getLocalName()); - } - } -}