Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java?rev=612903&r1=612902&r2=612903&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java Thu Jan 
17 10:11:35 2008
@@ -465,6 +465,19 @@
   protected static File getMetaFile( File f ) {
     return new File( f.getAbsolutePath() + METADATA_EXTENSION );
   }
+
+  static class ActiveFile {
+    File file;
+    List<Thread> threads = new ArrayList<Thread>(2);
+
+    ActiveFile(File f, List<Thread> list) {
+      file = f;
+      if (list != null) {
+        threads.addAll(list);
+      }
+      threads.add(Thread.currentThread());
+    }
+  } 
   
   protected File getMetaFile(Block b) throws IOException {
     File blockFile = getBlockFile( b );
@@ -487,7 +500,7 @@
   }
     
   FSVolumeSet volumes;
-  private HashMap<Block,File> ongoingCreates = new HashMap<Block,File>();
+  private HashMap<Block,ActiveFile> ongoingCreates = new 
HashMap<Block,ActiveFile>();
   private int maxBlocksPerDir = 0;
   private HashMap<Block,FSVolume> volumeMap = null;
   private HashMap<Block,File> blockMap = null;
@@ -570,20 +583,34 @@
   }
     
   BlockWriteStreams createBlockWriteStreams( File f ) throws IOException {
-      return new BlockWriteStreams(new FileOutputStream(f),
-          new FileOutputStream( getMetaFile( f ) ));
+      return new BlockWriteStreams(new FileOutputStream(new RandomAccessFile( 
f , "rw" ).getFD()),
+          new FileOutputStream( new RandomAccessFile( getMetaFile( f ) , "rw" 
).getFD() ));
 
   }
   
   /**
    * Start writing to a block file
+   * If isRecovery is true and the block pre-exists, then we kill all
+      volumeMap.put(b, v);
+      volumeMap.put(b, v);
+   * other threads that might be writing to this block, and then reopen the 
file.
    */
-  public BlockWriteStreams writeToBlock(Block b) throws IOException {
+  public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws 
IOException {
     //
     // Make sure the block isn't a valid one - we're still creating it!
     //
     if (isValidBlock(b)) {
-      throw new IOException("Block " + b + " is valid, and cannot be written 
to.");
+      if (!isRecovery) {
+        throw new IOException("Block " + b + " is valid, and cannot be written 
to.");
+      }
+      // If the block was succesfully finalized because all packets
+      // were successfully processed at the Datanode but the ack for
+      // some of the packets were not received by the client. The client 
+      // re-opens the connection and retries sending those packets. The
+      // client will now fail because this datanode has no way of
+      // unfinalizing this block.
+      // 
+      throw new IOException("Reopen Block " + b + " is valid, and cannot be 
written to.");
     }
     long blockSize = b.getNumBytes();
 
@@ -591,33 +618,56 @@
     // Serialize access to /tmp, and check if file already there.
     //
     File f = null;
+    List<Thread> threads = null;
     synchronized (this) {
       //
       // Is it already in the create process?
       //
-      if (ongoingCreates.containsKey(b)) {
-        // check how old is the temp file - wait 1 hour
-        File tmp = ongoingCreates.get(b);
-        if ((System.currentTimeMillis() - tmp.lastModified()) < 
-            blockWriteTimeout) {
-          throw new IOException("Block " + b +
-                                " has already been started (though not 
completed), and thus cannot be created.");
+      ActiveFile activeFile = ongoingCreates.get(b);
+      if (activeFile != null) {
+        f = activeFile.file;
+        threads = activeFile.threads;
+        
+        if (!isRecovery) {
+          // check how old is the temp file - wait 1 hour
+          if ((System.currentTimeMillis() - f.lastModified()) < 
+              blockWriteTimeout) {
+            throw new IOException("Block " + b +
+                                  " has already been started (though not 
completed), and thus cannot be created.");
+          } else {
+            // stale temp file - remove
+            if (!f.delete()) {
+              throw new IOException("Can't write the block - unable to remove 
stale temp file " + f);
+            }
+            f = null;
+          }
         } else {
-          // stale temp file - remove
-          if (!tmp.delete()) {
-            throw new IOException("Can't write the block - unable to remove 
stale temp file " + tmp);
+          for (Thread thread:threads) {
+            thread.interrupt();
           }
-          ongoingCreates.remove(b);
         }
+        ongoingCreates.remove(b);
       }
       FSVolume v = null;
-      synchronized (volumes) {
-        v = volumes.getNextVolume(blockSize);
-        // create temporary file to hold block in the designated volume
-        f = createTmpFile(v, b);
+      if (!isRecovery) {
+        synchronized (volumes) {
+          v = volumes.getNextVolume(blockSize);
+          // create temporary file to hold block in the designated volume
+          f = createTmpFile(v, b);
+        }
+        volumeMap.put(b, v);
       }
-      ongoingCreates.put(b, f);
-      volumeMap.put(b, v);
+      ongoingCreates.put(b, new ActiveFile(f, threads));
+    }
+
+    try {
+      if (threads != null) {
+        for (Thread thread:threads) {
+          thread.join();
+        }
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Recovery waiting for thread interrupted.");
     }
 
     //
@@ -628,6 +678,29 @@
     return createBlockWriteStreams( f );
   }
 
+  /**
+   * Retrieves the offset in the block to which the
+   * the next write will write data to.
+   */
+  public long getChannelPosition(Block b, BlockWriteStreams streams) 
+                                 throws IOException {
+    FileOutputStream file = (FileOutputStream) streams.dataOut;
+    return file.getChannel().position();
+  }
+
+  /**
+   * Sets the offset in the block to which the
+   * the next write will write data to.
+   */
+  public void setChannelPosition(Block b, BlockWriteStreams streams, 
+                                 long dataOffset, long ckOffset) 
+                                 throws IOException {
+    FileOutputStream file = (FileOutputStream) streams.dataOut;
+    file.getChannel().position(dataOffset);
+    file = (FileOutputStream) streams.checksumOut;
+    file.getChannel().position(ckOffset);
+  }
+
   File createTmpFile( FSVolume vol, Block blk ) throws IOException {
     if ( vol == null ) {
       synchronized ( this ) {
@@ -654,7 +727,7 @@
    * Complete the block write!
    */
   public synchronized void finalizeBlock(Block b) throws IOException {
-    File f = ongoingCreates.get(b);
+    File f = ongoingCreates.get(b).file;
     if (f == null || !f.exists()) {
       throw new IOException("No temporary file " + f + " for block " + b);
     }
@@ -669,6 +742,15 @@
   }
 
   /**
+   * Remove the temporary block file (if any)
+   */
+  public synchronized void unfinalizeBlock(Block b) throws IOException {
+    ongoingCreates.remove(b);
+    volumeMap.remove(b);
+    DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
+  }
+
+  /**
    * Return a table of block data
    */
   public Block[] getBlockReport() {
@@ -773,6 +855,10 @@
 
   public String toString() {
     return "FSDataset{dirpath='"+volumes+"'}";
+  }
+
+  public long getBlockSize(Block b) {
+    return blockMap.get(b).length();
   }
 
 }

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDatasetInterface.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDatasetInterface.java?rev=612903&r1=612902&r2=612903&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDatasetInterface.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDatasetInterface.java 
Thu Jan 17 10:11:35 2008
@@ -147,11 +147,12 @@
   /**
    * Creates the block and returns output streams to write data and CRC
    * @param b
+   * @param isRecovery True if this is part of erro recovery, otherwise false
    * @return a BlockWriteStreams object to allow writing the block data
    *  and CRC
    * @throws IOException
    */
-  public BlockWriteStreams writeToBlock(Block b) throws IOException;
+  public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws 
IOException;
 
   /**
    * Finalizes the block previously opened for writing using writeToBlock.
@@ -163,6 +164,14 @@
   public void finalizeBlock(Block b) throws IOException;
 
   /**
+   * Unfinalizes the block previously opened for writing using writeToBlock.
+   * The temporary file associated with this block is deleted.
+   * @param b
+   * @throws IOException
+   */
+  public void unfinalizeBlock(Block b) throws IOException;
+
+  /**
    * Returns the block report - the full list of blocks stored
    * @return - the block report - the full list of blocks stored
    */
@@ -192,5 +201,28 @@
      * Stringifies the name of the storage
      */
   public String toString();
+
+  /**
+   * Returns the current offset in the data stream.
+   * @param b
+   * @param stream The stream to the data file and checksum file
+   * @return the position of the file pointer in the data stream
+   * @throws IOException
+   */
+  public long getChannelPosition(Block b, BlockWriteStreams stream) throws 
IOException;
+
+  /**
+   * Sets the file pointer of the data stream and checksum stream to
+   * the specified values.
+   * @param b
+   * @param stream The stream for the data file and checksum file
+   * @param dataOffset The position to which the file pointre for the data 
stream
+   *        should be set
+   * @param ckOffset The position to which the file pointre for the checksum 
stream
+   *        should be set
+   * @throws IOException
+   */
+  public void setChannelPosition(Block b, BlockWriteStreams stream, long 
dataOffset,
+                                 long ckOffset) throws IOException;
 
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=612903&r1=612902&r2=612903&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu 
Jan 17 10:11:35 2008
@@ -2430,12 +2430,13 @@
       //
       if (!isInSafeMode()) {
         NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
-                                      +"blockMap updated: "+node.getName()+" 
is added to "+block.getBlockName());
+                                      +"blockMap updated: "+node.getName()+" 
is added to "+block.getBlockName()+" size "+block.getNumBytes());
       }
     } else {
       NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
                                    + "Redundant addStoredBlock request 
received for " 
-                                   + block.getBlockName() + " on " + 
node.getName());
+                                   + block.getBlockName() + " on " + 
node.getName()
+                                   + " size " + block.getNumBytes());
     }
 
     //

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java?rev=612903&r1=612902&r2=612903&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java 
Thu Jan 17 10:11:35 2008
@@ -23,6 +23,8 @@
  * buffers output through a [EMAIL PROTECTED] BufferedOutputStream} and 
creates a checksum
  * file. */
 public class FSDataOutputStream extends DataOutputStream {
+  private OutputStream wrappedStream;
+
   private static class PositionCache extends FilterOutputStream {
     long position;
 
@@ -53,6 +55,7 @@
   public FSDataOutputStream(OutputStream out)
     throws IOException {
     super(new PositionCache(out));
+    wrappedStream = out;
   }
   
   public long getPos() throws IOException {
@@ -62,5 +65,10 @@
   public void close() throws IOException {
     flush();
     out.close();
+  }
+
+  // Returns the underlying output stream. This is used by unit tests.
+  public OutputStream getWrappedStream() {
+    return wrappedStream;
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Daemon.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Daemon.java?rev=612903&r1=612902&r2=612903&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Daemon.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Daemon.java Thu Jan 17 
10:11:35 2008
@@ -38,6 +38,13 @@
     this.setName(((Object)runnable).toString());
   }
 
+  /** Construct a daemon thread to be part of a specified thread group. */
+  public Daemon(ThreadGroup group, Runnable runnable) {
+    super(group, runnable);
+    this.runnable = runnable;
+    this.setName(((Object)runnable).toString());
+  }
+
   public Runnable getRunnable() {
     return runnable;
   }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?rev=612903&r1=612902&r2=612903&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java 
(original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Thu 
Jan 17 10:11:35 2008
@@ -22,6 +22,7 @@
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.dfs.FSConstants.DatanodeReportType;
@@ -37,9 +38,24 @@
  */
 public class MiniDFSCluster {
 
+  private class DataNodeProperties {
+    DataNode datanode;
+    Configuration conf;
+    String[] dnArgs;
+
+    DataNodeProperties(DataNode node, Configuration conf, String[] args) {
+      this.datanode = node;
+      this.conf = conf;
+      this.dnArgs = args;
+    }
+  }
+
   private Configuration conf;
   private NameNode nameNode;
-  private ArrayList<DataNode> dataNodes = new ArrayList<DataNode>();
+  private int numDataNodes;
+  private int curDatanodesNum = 0;
+  private ArrayList<DataNodeProperties> dataNodes = 
+                         new ArrayList<DataNodeProperties>();
   private File base_dir;
   private File data_dir;
   
@@ -214,7 +230,7 @@
    *
    * @throws IllegalStateException if NameNode has been shutdown
    */
-  public void startDataNodes(Configuration conf, int numDataNodes, 
+  public synchronized void startDataNodes(Configuration conf, int 
numDataNodes, 
                              boolean manageDfsDirs, StartupOption operation, 
                              String[] racks,
                              long[] simulatedCapacities) throws IOException {
@@ -255,7 +271,6 @@
                     null : new String[] {"-"+operation.toString()};
     String [] dnArgs = (operation == StartupOption.UPGRADE) ? null : args;
     
-    int curDatanodesNum = dataNodes.size();
     for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
       Configuration dnConf = new Configuration(conf);
       if (manageDfsDirs) {
@@ -279,8 +294,13 @@
       }
       System.out.println("Starting DataNode " + i + " with dfs.data.dir: " 
                          + dnConf.get("dfs.data.dir"));
-      dataNodes.add(DataNode.createDataNode(dnArgs, dnConf));
+      Configuration newconf = new Configuration(dnConf); // save config
+      dataNodes.add(new DataNodeProperties(
+                     DataNode.createDataNode(dnArgs, dnConf), 
+                     newconf, dnArgs));
     }
+    curDatanodesNum += numDataNodes;
+    this.numDataNodes += numDataNodes;
   }
   
   
@@ -334,7 +354,12 @@
    * Gets a list of the started DataNodes.  May be empty.
    */
   public ArrayList<DataNode> getDataNodes() {
-    return dataNodes;
+    ArrayList<DataNode> list = new ArrayList<DataNode>();
+    for (int i = 0; i < dataNodes.size(); i++) {
+      DataNode node = dataNodes.get(i).datanode;
+      list.add(node);
+    }
+    return list;
   }
   
   /**
@@ -365,9 +390,67 @@
   public void shutdownDataNodes() {
     for (int i = dataNodes.size()-1; i >= 0; i--) {
       System.out.println("Shutting down DataNode " + i);
-      DataNode dn = dataNodes.remove(i);
+      DataNode dn = dataNodes.remove(i).datanode;
       dn.shutdown();
+      numDataNodes--;
+    }
+  }
+
+  /*
+   * Shutdown a particular datanode
+   */
+  boolean stopDataNode(int i) {
+    if (i < 0 || i >= dataNodes.size()) {
+      return false;
+    }
+    DataNode dn = dataNodes.remove(i).datanode;
+    System.out.println("MiniDFSCluster Stopping DataNode " + 
+                       dn.dnRegistration.getName() +
+                       " from a total of " + (dataNodes.size() + 1) + 
+                       " datanodes.");
+    dn.shutdown();
+    numDataNodes--;
+    return true;
+  }
+
+  /*
+   * Restart a particular datanode
+   */
+  synchronized boolean restartDataNode(int i) throws IOException {
+    if (i < 0 || i >= dataNodes.size()) {
+      return false;
+    }
+    DataNodeProperties dnprop = dataNodes.remove(i);
+    DataNode dn = dnprop.datanode;
+    Configuration conf = dnprop.conf;
+    String[] args = dnprop.dnArgs;
+    System.out.println("MiniDFSCluster Restart DataNode " + 
+                       dn.dnRegistration.getName() +
+                       " from a total of " + (dataNodes.size() + 1) + 
+                       " datanodes.");
+    dn.shutdown();
+
+    // recreate new datanode with the same configuration as the one
+    // that was stopped.
+    Configuration newconf = new Configuration(conf); // save cloned config
+    dataNodes.add(new DataNodeProperties(
+                     DataNode.createDataNode(args, conf), 
+                     newconf, args));
+    return true;
+  }
+
+  /*
+   * Shutdown a datanode by name.
+   */
+  synchronized boolean stopDataNode(String name) {
+    int i;
+    for (i = 0; i < dataNodes.size(); i++) {
+      DataNode dn = dataNodes.get(i).datanode;
+      if (dn.dnRegistration.getName().equals(name)) {
+        break;
+      }
     }
+    return stopDataNode(i);
   }
   
   /**
@@ -423,7 +506,7 @@
 
     // make sure all datanodes are alive
     while( client.datanodeReport(DatanodeReportType.LIVE).length
-        != dataNodes.size()) {
+        != numDataNodes) {
       try {
         Thread.sleep(500);
       } catch (Exception e) {
@@ -450,7 +533,7 @@
     if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
       throw new IndexOutOfBoundsException();
     }
-    return dataNodes.get(dataNodeIndex).getFSDataset().getBlockReport();
+    return 
dataNodes.get(dataNodeIndex).datanode.getFSDataset().getBlockReport();
   }
   
   
@@ -482,13 +565,13 @@
     if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
       throw new IndexOutOfBoundsException();
     }
-    FSDatasetInterface dataSet = dataNodes.get(dataNodeIndex).getFSDataset();
+    FSDatasetInterface dataSet = 
dataNodes.get(dataNodeIndex).datanode.getFSDataset();
     if (!(dataSet instanceof SimulatedFSDataset)) {
       throw new IOException("injectBlocks is valid only for 
SimilatedFSDataset");
     }
     SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet;
     sdataset.injectBlocks(blocksToInject);
-    dataNodes.get(dataNodeIndex).scheduleBlockReport(0);
+    dataNodes.get(dataNodeIndex).datanode.scheduleBlockReport(0);
   }
   
   /**

Modified: 
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java?rev=612903&r1=612902&r2=612903&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java 
(original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/SimulatedFSDataset.java 
Thu Jan 17 10:11:35 2008
@@ -97,6 +97,14 @@
         return theBlock.len;
       }
     }
+
+    synchronized void setlength(long length) {
+      if (!finalized) {
+         oStream.setLength(length);
+      } else {
+        theBlock.len = length;
+      }
+    }
     
     synchronized SimulatedInputStream getIStream() throws IOException {
       if (!finalized) {
@@ -145,6 +153,10 @@
     SimulatedInputStream getMetaIStream() {
       return new SimulatedInputStream(nullCrcFileData);  
     }
+
+    synchronized boolean isFinalized() {
+      return finalized;
+    }
   }
   
   static private class SimulatedStorage {
@@ -234,6 +246,10 @@
 
   }
 
+  public synchronized void unfinalizeBlock(Block b) throws IOException {
+    blockMap.remove(b);
+  }
+
   public synchronized Block[] getBlockReport() {
     Block[] blockTable = new Block[blockMap.size()];
     int i = 0;
@@ -287,14 +303,20 @@
   }
 
   public synchronized boolean isValidBlock(Block b) {
-    return (blockMap.containsKey(b));
+    // return (blockMap.containsKey(b));
+    BInfo binfo = blockMap.get(b);
+    if (binfo == null) {
+      return false;
+    }
+    return binfo.isFinalized();
   }
 
   public String toString() {
     return "Simulated FSDataset";
   }
 
-  public synchronized BlockWriteStreams writeToBlock(Block b)
+  public synchronized BlockWriteStreams writeToBlock(Block b, 
+                                            boolean isRecovery)
                                             throws IOException {
     if (isValidBlock(b)) {
           throw new IOException("Block " + b + 
@@ -374,8 +396,27 @@
   public void checkDataDir() throws DiskErrorException {
     // nothing to check for simulated data set
   }
-  
-  
+
+  public synchronized long getChannelPosition(Block b, 
+                                              BlockWriteStreams stream)
+                                              throws IOException {
+    BInfo binfo = blockMap.get(b);
+    if (binfo == null) {
+      throw new IOException("No such Block " + b );
+    }
+    return binfo.getlength();
+  }
+
+  public synchronized void setChannelPosition(Block b, BlockWriteStreams 
stream, 
+                                              long dataOffset, long ckOffset)
+                                              throws IOException {
+    BInfo binfo = blockMap.get(b);
+    if (binfo == null) {
+      throw new IOException("No such Block " + b );
+    }
+    binfo.setlength(dataOffset);
+  }
+
   /** 
    * Simulated input and output streams
    *
@@ -470,10 +511,16 @@
     
     /**
      * 
-     * @return the lenght of the data created so far.
+     * @return the length of the data created so far.
      */
     long getLength() {
       return length;
+    }
+
+    /**
+     */
+    void setLength(long length) {
+      this.length = length;
     }
     
     @Override

Modified: 
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java?rev=612903&r1=612902&r2=612903&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
 (original)
+++ 
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
 Thu Jan 17 10:11:35 2008
@@ -28,6 +28,7 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.dfs.DFSClient.DFSDataInputStream;
@@ -46,11 +47,13 @@
   
   DatanodeID datanode;
   InetSocketAddress dnAddr;
-  byte[] sendBuf = new byte[128];
-  byte[] recvBuf = new byte[128];
-  ByteBuffer byteBuf = ByteBuffer.wrap(sendBuf);
-  ByteBuffer recvByteBuf = ByteBuffer.wrap(recvBuf);
-  
+  ByteArrayOutputStream sendBuf = new ByteArrayOutputStream(128);
+  DataOutputStream sendOut = new DataOutputStream(sendBuf);
+  // byte[] recvBuf = new byte[128];
+  // ByteBuffer recvByteBuf = ByteBuffer.wrap(recvBuf);
+  ByteArrayOutputStream recvBuf = new ByteArrayOutputStream(128);
+  DataOutputStream recvOut = new DataOutputStream(recvBuf);
+
   private void sendRecvData(String testDescription,
                             boolean eofExpected) throws IOException {
     /* Opens a socket to datanode
@@ -73,10 +76,10 @@
       
       OutputStream out = sock.getOutputStream();
       // Should we excuse 
-      out.write(sendBuf, 0, byteBuf.position());
-      byte[] retBuf = new byte[recvByteBuf.position()];
+      byte[] retBuf = new byte[recvBuf.size()];
       
       DataInputStream in = new DataInputStream(sock.getInputStream());
+      out.write(sendBuf.toByteArray());
       try {
         in.readFully(retBuf);
       } catch (EOFException eof) {
@@ -86,6 +89,10 @@
         }
         throw eof;
       }
+      for (int i=0; i<retBuf.length; i++) {
+        System.out.print(retBuf[i]);
+      }
+      System.out.println(":");
       
       if (eofExpected) {
         throw new IOException("Did not recieve IOException when an exception " 
+
@@ -93,8 +100,10 @@
                               datanode.getName());
       }
       
+      byte[] needed = recvBuf.toByteArray();
       for (int i=0; i<retBuf.length; i++) {
-        assertEquals("checking byte[" + i + "]", recvBuf[i], retBuf[i]);
+        System.out.print(retBuf[i]);
+        assertEquals("checking byte[" + i + "]", needed[i], retBuf[i]);
       }
     } finally {
       IOUtils.closeSocket(sock);
@@ -139,91 +148,165 @@
     Block firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
     long newBlockId = firstBlock.getBlockId() + 1;
 
-    recvByteBuf.position(1);
-    byteBuf.position(0);
+    recvBuf.reset();
+    sendBuf.reset();
     
-    int versionPos = 0;
-    byteBuf.putShort((short)(FSConstants.DATA_TRANFER_VERSION-1));
+    // bad version
+    recvOut.writeShort((short)(FSConstants.DATA_TRANFER_VERSION-1));
+    sendOut.writeShort((short)(FSConstants.DATA_TRANFER_VERSION-1));
     sendRecvData("Wrong Version", true);
-    // correct the version
-    byteBuf.putShort(versionPos, (short)FSConstants.DATA_TRANFER_VERSION);
-    
-    int opPos = byteBuf.position();
-    byteBuf.put((byte)(FSConstants.OP_WRITE_BLOCK-1));
+
+    // bad ops
+    sendBuf.reset();
+    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeByte((byte)(FSConstants.OP_WRITE_BLOCK-1));
     sendRecvData("Wrong Op Code", true);
     
     /* Test OP_WRITE_BLOCK */
-    
-    byteBuf.position(opPos);
-    // Initially write correct values
-    byteBuf.put((byte)FSConstants.OP_WRITE_BLOCK);
-    int blockPos = byteBuf.position();
-    byteBuf.putLong(newBlockId);
-    int targetPos = byteBuf.position();
-    byteBuf.putInt(0);
-    int checksumPos = byteBuf.position();
-    byteBuf.put((byte)DataChecksum.CHECKSUM_CRC32);
-    
-    byteBuf.putInt(-1-random.nextInt(oneMil));
-    recvByteBuf.position(0);
-    recvByteBuf.putShort((short)FSConstants.OP_STATUS_ERROR);
-    sendRecvData("wrong bytesPerChecksum while writing", false);
-    byteBuf.putInt(checksumPos+1, 512);
-    
-    byteBuf.putInt(targetPos, -1-random.nextInt(oneMil));
-    sendRecvData("bad targets len while writing", true);
-    byteBuf.putInt(targetPos, 0);
-    
-    byteBuf.putLong(blockPos, ++newBlockId);
-    int dataChunkPos = byteBuf.position();
-    byteBuf.putInt(-1-random.nextInt(oneMil));
-    recvByteBuf.position(0);
-    recvByteBuf.putShort((short)FSConstants.OP_STATUS_ERROR);//err ret 
expected.
-    sendRecvData("negative DATA_CHUNK len while writing", false);
-    byteBuf.putInt(dataChunkPos, 0);
-    
-    byteBuf.putInt(0); // zero checksum
-    byteBuf.putLong(blockPos, ++newBlockId);    
+    sendBuf.reset();
+    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
+    sendOut.writeLong(newBlockId); // block id
+    sendOut.writeInt(0);           // targets in pipeline 
+    sendOut.writeBoolean(false);   // recoveryFlag
+    Text.writeString(sendOut, "cl");// clientID
+    sendOut.writeInt(0);           // number of downstream targets
+    sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
+    
+    // bad bytes per checksum
+    sendOut.writeInt(-1-random.nextInt(oneMil));
+    recvBuf.reset();
+    recvOut.writeShort((short)FSConstants.OP_STATUS_ERROR);
+    sendRecvData("wrong bytesPerChecksum while writing", true);
+
+    sendBuf.reset();
+    recvBuf.reset();
+    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
+    sendOut.writeLong(newBlockId);
+    sendOut.writeInt(0);           // targets in pipeline 
+    sendOut.writeBoolean(false);   // recoveryFlag
+    Text.writeString(sendOut, "cl");// clientID
+
+    // bad number of targets
+    sendOut.writeInt(-1-random.nextInt(oneMil));
+    recvOut.writeShort((short)FSConstants.OP_STATUS_ERROR);
+    sendRecvData("bad targets len while writing block " + newBlockId, true);
+
+    sendBuf.reset();
+    recvBuf.reset();
+    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
+    sendOut.writeLong(++newBlockId);
+    sendOut.writeInt(0);           // targets in pipeline 
+    sendOut.writeBoolean(false);   // recoveryFlag
+    Text.writeString(sendOut, "cl");// clientID
+    sendOut.writeInt(0);
+    sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
+    sendOut.writeInt((int)512);
+    sendOut.writeInt(20);          // size of packet
+    sendOut.writeLong(0);          // OffsetInBlock
+    sendOut.writeLong(100);        // sequencenumber
+    sendOut.writeBoolean(false);   // lastPacketInBlock
+    
+    // bad data chunk length
+    sendOut.writeInt(-1-random.nextInt(oneMil));
+    Text.writeString(recvOut, ""); // first bad node
+    recvOut.writeLong(100);        // sequencenumber
+    recvOut.writeShort((short)FSConstants.OP_STATUS_ERROR);
+    sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId, 
+                 true);
+
+    // test for writing a valid zero size block
+    sendBuf.reset();
+    recvBuf.reset();
+    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
+    sendOut.writeLong(++newBlockId);
+    sendOut.writeInt(0);           // targets in pipeline 
+    sendOut.writeBoolean(false);   // recoveryFlag
+    Text.writeString(sendOut, "cl");// clientID
+    sendOut.writeInt(0);
+    sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
+    sendOut.writeInt((int)512);    // checksum size
+    sendOut.writeInt(20);          // size of packet
+    sendOut.writeLong(0);          // OffsetInBlock
+    sendOut.writeLong(100);        // sequencenumber
+    sendOut.writeBoolean(true);    // lastPacketInBlock
+
+    sendOut.writeInt(0);           // chunk length
+    sendOut.writeInt(0);           // zero checksum
     //ok finally write a block with 0 len
-    recvByteBuf.putShort(0, (short)FSConstants.OP_STATUS_SUCCESS);
-    sendRecvData("Writing a zero len block", false);
-    
+    Text.writeString(recvOut, ""); // first bad node
+    recvOut.writeLong(100);        // sequencenumber
+    recvOut.writeShort((short)FSConstants.OP_STATUS_SUCCESS);
+    sendRecvData("Writing a zero len block blockid " + newBlockId, false);
     
     /* Test OP_READ_BLOCK */
-    
-    byteBuf.position(opPos);
-    byteBuf.put((byte)FSConstants.OP_READ_BLOCK);
-    blockPos = byteBuf.position();
+
+    // bad block id
+    sendBuf.reset();
+    recvBuf.reset();
+    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
     newBlockId = firstBlock.getBlockId()-1;
-    byteBuf.putLong(newBlockId);
-    int startOffsetPos = byteBuf.position();
-    byteBuf.putLong(0L);
-    int lenPos = byteBuf.position();
-    byteBuf.putLong(fileLen);
-    recvByteBuf.position(0);
-    recvByteBuf.putShort((short)FSConstants.OP_STATUS_ERROR);
-    sendRecvData("Wrong block ID for read", false); 
-    byteBuf.putLong(blockPos, firstBlock.getBlockId());
-    
-    byteBuf.putLong(startOffsetPos, -1-random.nextInt(oneMil));
-    sendRecvData("Negative start-offset for read", false);
-    
-    byteBuf.putLong(startOffsetPos, fileLen);
-    sendRecvData("Wrong start-offset for read", false);
-    byteBuf.putLong(startOffsetPos, 0);
+    sendOut.writeLong(newBlockId);
+    sendOut.writeLong(0L);
+    sendOut.writeLong(fileLen);
+    recvOut.writeShort((short)FSConstants.OP_STATUS_ERROR);
+    sendRecvData("Wrong block ID " + newBlockId + " for read", false); 
+
+    // negative block start offset
+    sendBuf.reset();
+    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
+    sendOut.writeLong(firstBlock.getBlockId());
+    sendOut.writeLong(-1L);
+    sendOut.writeLong(fileLen);
+    sendRecvData("Negative start-offset for read for block " + 
+                 firstBlock.getBlockId(), false);
+
+    // bad block start offset
+    sendBuf.reset();
+    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
+    sendOut.writeLong(firstBlock.getBlockId());
+    sendOut.writeLong(fileLen);
+    sendOut.writeLong(fileLen);
+    sendRecvData("Wrong start-offset for reading block " +
+                 firstBlock.getBlockId(), false);
     
     // negative length is ok. Datanode assumes we want to read the whole block.
-    recvByteBuf.putShort(0, (short)FSConstants.OP_STATUS_SUCCESS);    
-    byteBuf.putLong(lenPos, -1-random.nextInt(oneMil));
-    sendRecvData("Negative length for read", false);
-    
-    recvByteBuf.putShort(0, (short)FSConstants.OP_STATUS_ERROR);
-    byteBuf.putLong(lenPos, fileLen+1);
-    sendRecvData("Wrong length for read", false);
-    byteBuf.putLong(lenPos, fileLen);
+    recvBuf.reset();
+    recvOut.writeShort((short)FSConstants.OP_STATUS_SUCCESS);    
+    sendBuf.reset();
+    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
+    sendOut.writeLong(firstBlock.getBlockId());
+    sendOut.writeLong(0);
+    sendOut.writeLong(-1-random.nextInt(oneMil));
+    sendRecvData("Negative length for reading block " +
+                 firstBlock.getBlockId(), false);
+    
+    // length is more than size of block.
+    recvBuf.reset();
+    recvOut.writeShort((short)FSConstants.OP_STATUS_ERROR);    
+    sendBuf.reset();
+    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
+    sendOut.writeLong(firstBlock.getBlockId());
+    sendOut.writeLong(0);
+    sendOut.writeLong(fileLen + 1);
+    sendRecvData("Wrong length for reading block " +
+                 firstBlock.getBlockId(), false);
     
     //At the end of all this, read the file to make sure that succeeds finally.
+    sendBuf.reset();
+    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
+    sendOut.writeLong(firstBlock.getBlockId());
+    sendOut.writeLong(0);
+    sendOut.writeLong(fileLen);
     readFile(fileSys, file, fileLen);
   }
 }
-

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeDeath.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeDeath.java?rev=612903&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeDeath.java 
(added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeDeath.java 
Thu Jan 17 10:11:35 2008
@@ -0,0 +1,467 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.Random;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+/**
+ * This class tests that a file need not be closed before its
+ * data can be read by another client.
+ */
+public class TestDatanodeDeath extends TestCase {
+  static final long seed = 0xDEADBEEFL;
+  static final int blockSize = 8192;
+  static final int numBlocks = 2;
+  static final int fileSize = numBlocks * blockSize + 1;
+  static final int numDatanodes = 15;
+  static final short replication = 3;
+
+  int numberOfFiles = 3;
+  int numThreads = 5;
+  Workload[] workload = null;
+
+  //
+  // an object that does a bunch of transactions
+  //
+  class Workload extends Thread {
+    private short replication;
+    private int numberOfFiles;
+    private int id;
+    private FileSystem fs;
+    private long stamp;
+
+    Workload(FileSystem fs, int threadIndex, int numberOfFiles, 
+             short replication, long stamp) {
+      id = threadIndex;
+      this.fs = fs;
+      this.numberOfFiles = numberOfFiles;
+      this.replication = replication;
+      this.stamp = stamp;
+    }
+
+    // create a bunch of files. Write to them and then verify.
+    public void run() {
+      System.out.println("Workload starting ");
+      for (int i = 0; i < numberOfFiles; i++) {
+        Path filename = new Path(id + "." + i);
+        long myseed = seed + id + i;
+        try {
+          System.out.println("Workload processing file " + filename);
+          FSDataOutputStream stm = createFile(fs, filename, replication);
+          DFSClient.DFSOutputStream dfstream = (DFSClient.DFSOutputStream)
+                                                 (stm.getWrappedStream());
+          dfstream.setArtificialSlowdown(1000);
+          writeFile(stm, myseed);
+          stm.close();
+          checkFile(fs, filename, replication, numBlocks, fileSize, myseed);
+        } catch (Throwable e) {
+          System.out.println("Workload exception " + e);
+          assertTrue(e.toString(), false);
+        }
+
+        // increment the stamp to indicate that another file is done.
+        synchronized (this) {
+          stamp++;
+        }
+      }
+    }
+
+    public synchronized void resetStamp() {
+      this.stamp = 0;
+    }
+
+    public synchronized long getStamp() {
+      return stamp;
+    }
+  }
+
+  //
+  // creates a file and returns a descriptor for writing to it.
+  //
+  private FSDataOutputStream createFile(FileSystem fileSys, Path name, short 
repl)
+    throws IOException {
+    // create and write a file that contains three blocks of data
+    FSDataOutputStream stm = fileSys.create(name, true,
+                                            
fileSys.getConf().getInt("io.file.buffer.size", 4096),
+                                            repl, (long)blockSize);
+    return stm;
+  }
+
+  //
+  // writes to file
+  //
+  private void writeFile(FSDataOutputStream stm, long seed) throws IOException 
{
+    byte[] buffer = new byte[fileSize];
+    Random rand = new Random(seed);
+    rand.nextBytes(buffer);
+    int mid = fileSize/2;
+    stm.write(buffer, 0, mid);
+    stm.write(buffer, mid, fileSize - mid);
+  }
+
+
+  // wait till this block is confirmed by the datanodes. 
+  private void waitBlockConfirmation(FileSystem fileSys, Path name, 
+                                     int repl, int blockNumber) 
+                                     throws IOException {
+    boolean done = false;
+    long start = blockSize * blockNumber;
+    long end = blockSize * (blockNumber + 1) -1;
+
+    while (!done) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {}
+      done = true;
+      String[][] locations = fileSys.getFileCacheHints(name, start, end);
+      if (locations.length < 1) {
+        done = false;
+        continue;
+      }
+      if (locations[0].length < repl) {
+        done = false;
+        continue;
+      }
+    }
+  }
+
+  /**
+   * For blocks that reside on the nodes that are down, verify that their
+   * replication factor is 1 more than the specified one.
+   */
+  private DatanodeInfo[] getPipeline(FileSystem fileSys, Path name, 
+                                     short repl,
+                                     int blockNumber) 
+                                     throws IOException {
+    // need a raw stream
+    assertTrue("Not HDFS:"+fileSys.getUri(), 
+               fileSys instanceof DistributedFileSystem);
+
+    DFSClient.DFSDataInputStream dis = (DFSClient.DFSDataInputStream)
+      ((DistributedFileSystem)fileSys).open(name);
+    Collection<LocatedBlock> dinfo = dis.getAllBlocks();
+    int num = 0;
+    DatanodeInfo[] status = null;
+
+    for (LocatedBlock blk : dinfo) { // for each block
+      int hasdown = 0;
+      DatanodeInfo[] nodes = blk.getLocations();
+      for (int j = 0; j < nodes.length; j++) {     // for each replica
+        System.out.println("Block " + blk.getBlock() + " replica " +
+                           nodes[j].getName());
+      }
+      if (blockNumber == num) {
+        status = nodes;
+      }
+      num++;
+    }
+    return status;
+  }
+
+  //
+  // verify that the data written are sane
+  // 
+  private void checkFile(FileSystem fileSys, Path name, int repl,
+                         int numblocks, int filesize, long seed)
+    throws IOException {
+    boolean done = false;
+    int attempt = 0;
+
+    long len = fileSys.getFileStatus(name).getLen();
+    assertTrue(name + " should be of size " + filesize +
+               " but found to be of size " + len, 
+               len == filesize);
+
+    // wait till all full blocks are confirmed by the datanodes.
+    while (!done) {
+      attempt++;
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {}
+      done = true;
+      String[][] locations = fileSys.getFileCacheHints(name, 0, filesize);
+      if (locations.length < numblocks) {
+        if (attempt > 100) {
+          System.out.println("File " + name + " has only " +
+                             locations.length + " blocks, " +
+                             " but is expected to have " + numblocks +
+                             " blocks.");
+        }
+        done = false;
+        continue;
+      }
+      for (int idx = 0; idx < locations.length; idx++) {
+        if (locations[idx].length < repl) {
+          if (attempt > 100) {
+            System.out.println("File " + name + " has " +
+                               locations.length + " blocks: " +
+                               " The " + idx + " block has only " +
+                               locations[idx].length + " replicas " +
+                               " but is expected to have " + repl +
+                               " replicas.");
+          }
+          done = false;
+          break;
+        }
+      }
+    }
+    FSDataInputStream stm = fileSys.open(name);
+    byte[] expected = new byte[filesize];
+    Random rand = new Random(seed);
+    rand.nextBytes(expected);
+    // do a sanity check. Read the file
+    byte[] actual = new byte[filesize];
+    stm.readFully(0, actual);
+    checkData(actual, 0, expected, "Read 1");
+  }
+
+  private void checkData(byte[] actual, int from, byte[] expected, String 
message) {
+    for (int idx = 0; idx < actual.length; idx++) {
+      this.assertEquals(message+" byte "+(from+idx)+" differs. expected "+
+                        expected[from+idx]+" actual "+actual[idx],
+                        actual[idx], expected[from+idx]);
+      actual[idx] = 0;
+    }
+  }
+
+  /**
+   * A class that kills one datanode and recreates a new one. It waits to
+   * ensure that that all workers have finished at least one file since the 
+   * last kill of a datanode. This guarantees that all three replicas of
+   * a block do not get killed (otherwise the file will be corrupt and the
+   * test will fail).
+   */
+  class Modify extends Thread {
+    Random rand;
+    volatile boolean running;
+    MiniDFSCluster cluster;
+    Configuration conf;
+
+    Modify(Configuration conf, MiniDFSCluster cluster) {
+      rand = new Random();
+      running = true;
+      this.cluster = cluster;
+      this.conf = conf;
+    }
+
+    public void run() {
+
+      while (running) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          continue;
+        }
+
+        // check if all threads have a new stamp. 
+        // If so, then all workers have finished at least one file
+        // since the last stamp.
+        boolean loop = false;
+        for (int i = 0; i < numThreads; i++) {
+          if (workload[i].getStamp() == 0) {
+            loop = true;
+            break;
+          }
+        }
+        if (loop) {
+          continue;
+        }
+
+        // Now it is guaranteed that there will be at least one valid
+        // replica of a file.
+
+        for (int i = 0; i < replication - 1; i++) {
+          // pick a random datanode to shutdown
+          int victim = rand.nextInt(numDatanodes);
+          try {
+            System.out.println("Stopping datanode " + victim);
+            cluster.restartDataNode(victim);
+            // cluster.startDataNodes(conf, 1, true, null, null);
+          } catch (IOException e) {
+            System.out.println("TestDatanodeDeath Modify exception " + e);
+            assertTrue("TestDatanodeDeath Modify exception " + e, false);
+            running = false;
+          }
+        }
+
+        // set a new stamp for all workers
+        for (int i = 0; i < numThreads; i++) {
+          workload[i].resetStamp();
+        }
+      }
+    }
+
+    // Make the thread exit.
+    void close() {
+      running = false;
+      this.interrupt();
+    }
+  }
+
+  /**
+   * Test that writing to files is good even when datanodes in the pipeline
+   * dies.
+   */
+  private void complexTest() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt("heartbeat.recheck.interval", 2000);
+    conf.setInt("dfs.heartbeat.interval", 1);
+    conf.setInt("dfs.replication.pending.timeout.sec", 2);
+    conf.setInt("dfs.socket.timeout", 5000);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, 
null);
+    cluster.waitActive();
+    FileSystem fs = cluster.getFileSystem();
+    Modify modThread = null;
+
+    try {
+      
+      // Create threads and make them run workload concurrently.
+      workload = new Workload[numThreads];
+      for (int i = 0; i < numThreads; i++) {
+        workload[i] = new Workload(fs, i, numberOfFiles, replication, 0);
+        workload[i].start();
+      }
+
+      // Create a thread that kills existing datanodes and creates new ones.
+      modThread = new Modify(conf, cluster);
+      modThread.start();
+
+      // wait for all transactions to get over
+      for (int i = 0; i < numThreads; i++) {
+        try {
+          System.out.println("Waiting for thread " + i + " to complete...");
+          workload[i].join();
+
+          // if most of the threads are done, then stop restarting datanodes.
+          if (i >= numThreads/2) {
+            modThread.close();
+          }
+         
+        } catch (InterruptedException e) {
+          i--;      // retry
+        }
+      }
+    } finally {
+      if (modThread != null) {
+        modThread.close();
+        try {
+          modThread.join();
+        } catch (InterruptedException e) {}
+      }
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Write to one file, then kill one datanode in the pipeline and then
+   * close the file.
+   */
+  private void simpleTest(int datanodeToKill) throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt("heartbeat.recheck.interval", 2000);
+    conf.setInt("dfs.heartbeat.interval", 1);
+    conf.setInt("dfs.replication.pending.timeout.sec", 2);
+    conf.setInt("dfs.socket.timeout", 5000);
+    int myMaxNodes = 5;
+    System.out.println("SimpleTest starting with DataNode to Kill " + 
+                       datanodeToKill);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, myMaxNodes, true, null);
+    cluster.waitActive();
+    FileSystem fs = cluster.getFileSystem();
+    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    short repl = 3;
+
+    Path filename = new Path("simpletest.dat");
+    Random rand = new Random();
+    long myseed = rand.nextInt();
+    rand = new Random(myseed);
+    try {
+
+      // create a file and write one block of data
+      System.out.println("SimpleTest creating file " + filename);
+      FSDataOutputStream stm = createFile(fs, filename, repl);
+      DFSClient.DFSOutputStream dfstream = (DFSClient.DFSOutputStream)
+                                             (stm.getWrappedStream());
+
+      // these are test settings
+      int bytesPerChecksum = conf.getInt( "io.bytes.per.checksum", 512); 
+      dfstream.setChunksPerPacket(5);
+      dfstream.setArtificialSlowdown(3000);
+
+      byte[] buffer = new byte[fileSize];
+      rand.nextBytes(buffer);
+      int mid = fileSize/4;
+      stm.write(buffer, 0, mid);
+
+      DatanodeInfo[] targets = dfstream.getPipeline();
+      int count = 5;
+      while (count-- > 0 && targets == null) {
+        try {
+          System.out.println("SimpleTest: Waiting for pipeline to be 
created.");
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+        }
+        targets = dfstream.getPipeline();
+      }
+
+      if (targets == null) {
+        int victim = rand.nextInt(myMaxNodes);
+        System.out.println("SimpleTest stopping datanode random " + victim);
+        cluster.stopDataNode(victim);
+      } else {
+        int victim = datanodeToKill;
+        System.out.println("SimpleTest stopping datanode " +
+                            targets[victim].getName());
+        cluster.stopDataNode(targets[victim].getName());
+      }
+      System.out.println("SimpleTest stopping datanode complete");
+
+      // write some more data to file, close and verify
+      stm.write(buffer, mid, fileSize - mid);
+      stm.close();
+
+      checkFile(fs, filename, repl, numBlocks, fileSize, myseed);
+    } catch (Throwable e) {
+      System.out.println("Simple Workload exception " + e);
+      e.printStackTrace();
+      assertTrue(e.toString(), false);
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  public void testDatanodeDeath() throws IOException {
+    for (int i = 0; i < 3; i++) {
+      simpleTest(i); // kills the ith datanode in the pipeline
+    }
+    complexTest();
+  }
+}

Propchange: 
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeDeath.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeDeath.java
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileLimit.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileLimit.java?rev=612903&r1=612902&r2=612903&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileLimit.java 
(original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileLimit.java Thu 
Jan 17 10:11:35 2008
@@ -39,8 +39,6 @@
 public class TestFileLimit extends TestCase {
   static final long seed = 0xDEADBEEFL;
   static final int blockSize = 8192;
-  static final int numBlocks = 2;
-  static final int fileSize = numBlocks * blockSize;
   boolean simulatedStorage = false;
 
   // The test file is 2 times the blocksize plus one. This means that when the
@@ -59,6 +57,10 @@
     FSDataOutputStream stm = fileSys.create(name, true,
                                             
fileSys.getConf().getInt("io.file.buffer.size", 4096),
                                             (short)1, (long)blockSize);
+    byte[] buffer = new byte[1024];
+    Random rand = new Random(seed);
+    rand.nextBytes(buffer);
+    stm.write(buffer);
     stm.close();
   }
 

Modified: 
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSetrepIncreasing.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSetrepIncreasing.java?rev=612903&r1=612902&r2=612903&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSetrepIncreasing.java 
(original)
+++ 
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSetrepIncreasing.java 
Thu Jan 17 10:11:35 2008
@@ -31,6 +31,7 @@
     }
     conf.set("dfs.replication", "" + fromREP);
     conf.setLong("dfs.blockreport.intervalMsec", 1000L);
+    conf.set("dfs.replication.pending.timeout.sec", Integer.toString(2));
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 10, true, null);
     FileSystem fs = cluster.getFileSystem();
     assertTrue("Not a HDFS: "+fs.getUri(), fs instanceof 
DistributedFileSystem);

Modified: 
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSimulatedFSDataset.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSimulatedFSDataset.java?rev=612903&r1=612902&r2=612903&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSimulatedFSDataset.java 
(original)
+++ 
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestSimulatedFSDataset.java 
Thu Jan 17 10:11:35 2008
@@ -60,7 +60,7 @@
     int bytesAdded = 0;
     for (int i = 1; i <= NUMBLOCKS; ++i) {
       Block b = new Block(i, 0); // we pass expected len as zero, - fsdataset 
should use the sizeof actual data written
-      OutputStream dataOut  = fsdataset.writeToBlock(b).dataOut;
+      OutputStream dataOut  = fsdataset.writeToBlock(b, false).dataOut;
       assertEquals(0, fsdataset.getLength(b));
       for (int j=1; j <= blockIdToLen(i); ++j) {
         dataOut.write(j);


Reply via email to