To solve the checksum errors on the non-ecc memory machines, I
modified some codes in DFSClient.java and DataNode.java.

The idea is very simple.
The original CHUNK structure is
{chunk size}{chunk data}{chunk size}{chunk data}...

The modified CHUNK structure is
{chunk size}{chunk data}{chunk crc}{chunk size}{chunk data}{chunk crc}...

Here is codes.
------------------------
DFSClient.java

import java.util.zip.*;

private synchronized void endBlock() throws IOException {
      long sleeptime = 400;
      //
      // Done with local copy
      //
      closeBackupStream();
      Checksum sum = new CRC32();

      //
      // Send it to datanode
      //
      boolean sentOk = false;
      int remainingAttempts =
        conf.getInt("dfs.client.block.write.retries", 3);
      while (!sentOk) {
        nextBlockOutputStream();
        InputStream in = new FileInputStream(backupFile);
        try {
          byte buf[] = new byte[BUFFER_SIZE];
          int bytesRead = in.read(buf);
          while (bytesRead > 0) {
                
                boolean checked = false;
                while (!checked) {      
                blockStream.writeLong((long) bytesRead);
                blockStream.write(buf, 0, bytesRead);
                
                // here we will send crc data
                sum.reset();
                sum.update(buf, 0, bytesRead);
                int crc = (int) sum.getValue();
                blockStream.writeInt(crc);
                blockStream.flush();
                
                byte re = (byte) blockReplyStream.read();
                if (re == 0x00) checked = true;
            }

            if (progress != null) { progress.progress(); }
            bytesRead = in.read(buf);
          }
          internalClose();
          sentOk = true;
        } catch (IOException ie) {
          handleSocketException(ie);
          remainingAttempts -= 1;
          if (remainingAttempts == 0) {
            throw ie;
          }
          try {
            Thread.sleep(sleeptime);
          } catch (InterruptedException e) {
          }
        } finally {
          in.close();
        }
      }

      bytesWrittenToBlock = 0;
      //
      // Delete local backup, start new one
      //
      deleteBackupFile();
      File tmpFile = newBackupFile();
      bytesWrittenToBlock = 0;
      backupStream = new FileOutputStream(tmpFile);
      backupFile = tmpFile;
    }


DataNode.java

import java.util.zip.*;

private void writeBlock(DataInputStream in) throws IOException {
      //
      // Read in the header
      //
      DataOutputStream reply =
        new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
      try {
        boolean shouldReportBlock = in.readBoolean();
        Block b = new Block();
        b.readFields(in);
        int numTargets = in.readInt();
        if (numTargets <= 0) {
          throw new IOException("Mislabelled incoming datastream.");
        }
        DatanodeInfo targets[] = new DatanodeInfo[numTargets];
        for (int i = 0; i < targets.length; i++) {
          DatanodeInfo tmp = new DatanodeInfo();
          tmp.readFields(in);
          targets[i] = tmp;
        }
        byte encodingType = (byte) in.read();
        long len = in.readLong();

        //
        // Make sure curTarget is equal to this machine
        //
        DatanodeInfo curTarget = targets[0];

        //
        // Track all the places we've successfully written the block
        //
        Vector<DatanodeInfo> mirrors = new Vector<DatanodeInfo>();

        //
        // Open local disk out
        //
        OutputStream o;
        try {
          o = data.writeToBlock(b);
        } catch( IOException e ) {
          checkDiskError( e );
          throw e;
        }
        DataOutputStream out = new DataOutputStream(new
BufferedOutputStream(o));
        InetSocketAddress mirrorTarget = null;
        String mirrorNode = null;
        try {
          //
          // Open network conn to backup machine, if
          // appropriate
          //
          DataInputStream in2 = null;
          DataOutputStream out2 = null;
          if (targets.length > 1) {
            // Connect to backup machine
            mirrorNode = targets[1].getName();
            mirrorTarget = createSocketAddr(mirrorNode);
            try {
              Socket s2 = new Socket();
              s2.connect(mirrorTarget, READ_TIMEOUT);
              s2.setSoTimeout(READ_TIMEOUT);
              out2 = new DataOutputStream(new
BufferedOutputStream(s2.getOutputStream()));
              in2 = new DataInputStream(new
BufferedInputStream(s2.getInputStream()));

              // Write connection header
              out2.write(OP_WRITE_BLOCK);
              out2.writeBoolean(shouldReportBlock);
              b.write(out2);
              out2.writeInt(targets.length - 1);
              for (int i = 1; i < targets.length; i++) {
                targets[i].write(out2);
              }
              out2.write(encodingType);
              out2.writeLong(len);
              myMetrics.replicatedBlocks(1);
            } catch (IOException ie) {
              if (out2 != null) {
                LOG.info("Exception connecting to mirror " + mirrorNode
                         + "\n" + StringUtils.stringifyException(ie));
                try {
                  out2.close();
                  in2.close();
                } catch (IOException out2close) {
                } finally {
                  out2 = null;
                  in2 = null;
                }
              }
            }
          }

          //
          // Process incoming data, copy to disk and
          // maybe to network. First copy to the network before
          // writing to local disk so that all datanodes might
          // write to local disk in parallel.
          //
          boolean anotherChunk = len != 0;
          byte buf[] = new byte[BUFFER_SIZE];

          Checksum sum = new CRC32();

          while (anotherChunk) {
                
                if (encodingType == CHUNKED_ENCODING)
                {
                        // read fully
                        int pos = 0;
                        int remain = (int) len;
                        while (remain > 0) {
                                int bytesRead = in.read(buf, pos, remain);
                                if (bytesRead < 0) {
                                        throw new EOFException("EOF reading 
from "+s.toString());
                                }
                                pos += bytesRead;
                                remain -= bytesRead;
                        }
                        
                        // read crc
                        int crc = in.readInt();
                        sum.reset();
                        sum.update(buf, 0, (int)len);
                        int res = (int) sum.getValue();
                        
                        if (crc == res) {
                                reply.write(0x00);
                                reply.flush();

                                if (out2 != null) {
                                        try {
                                                out2.write(buf, 0, (int)len);
                                                out2.writeInt(crc);
                                                out2.flush();
                                                byte re = (byte) in2.read();
                                        if (re != 0x00) throw new 
IOException("fail to copy
data to replica");
                  } catch (IOException out2e) {
                    LOG.info("Exception writing to mirror " + mirrorNode
                             + "\n" + StringUtils.stringifyException(out2e));
                    //
                    // If stream-copy fails, continue
                    // writing to disk.  We shouldn't
                    // interrupt client write.
                    //
                    try {
                      out2.close();
                      in2.close();
                    } catch (IOException out2close) {
                    } finally {
                      out2 = null;
                      in2 = null;
                    }
                  }
                }
                try {
                  out.write(buf, 0, (int)len);
                  myMetrics.wroteBytes((int)len);
                } catch (IOException iex) {
                  checkDiskError(iex);
                  throw iex;
                }

                len = in.readLong();
                      if (out2 != null) {
                        try {
                          out2.writeLong(len);
                        } catch (IOException ie) {
                          LOG.info("Exception writing to mirror " + mirrorNode
                                   + "\n" + StringUtils.stringifyException(ie));
                          try {
                            out2.close();
                            in2.close();
                          } catch (IOException ie2) {
                            // NOTHING
                          } finally {
                            out2 = null;
                            in2 = null;
                          }
                        }
                      }

                        } else {
                                reply.write(0x01);
                                reply.flush();
                                len = in.readLong();
                        }
                        
                        if (len == 0) {
                anotherChunk = false;
              }
                  } else if (encodingType == RUNLENGTH_ENCODING) {
                        
                        while (len > 0) {
                      int bytesRead = in.read(buf, 0, (int)Math.min(buf.length, 
len));
                      if (bytesRead < 0) {
                        throw new EOFException("EOF reading from 
"+s.toString());
                      }
                      if (bytesRead > 0) {
                        if (out2 != null) {
                          try {
                            out2.write(buf, 0, bytesRead);
                          } catch (IOException out2e) {
                            LOG.info("Exception writing to mirror " + mirrorNode
                                     + "\n" + 
StringUtils.stringifyException(out2e));
                            //
                            // If stream-copy fails, continue
                            // writing to disk.  We shouldn't
                            // interrupt client write.
                            //
                            try {
                              out2.close();
                              in2.close();
                            } catch (IOException out2close) {
                            } finally {
                              out2 = null;
                              in2 = null;
                            }
                          }
                        }
                        try {
                          out.write(buf, 0, bytesRead);
                          myMetrics.wroteBytes(bytesRead);
                        } catch (IOException iex) {
                          checkDiskError(iex);
                          throw iex;
                        }
                        len -= bytesRead;
                      }
                }
                        
              anotherChunk = false;
            }
          }

          if (out2 != null) {
            try {
              out2.flush();
              long complete = in2.readLong();
              if (complete != WRITE_COMPLETE) {
                LOG.info("Conflicting value for WRITE_COMPLETE: " + complete);
              }
              LocatedBlock newLB = new LocatedBlock();
              newLB.readFields(in2);
              in2.close();
              out2.close();
              DatanodeInfo mirrorsSoFar[] = newLB.getLocations();
              for (int k = 0; k < mirrorsSoFar.length; k++) {
                mirrors.add(mirrorsSoFar[k]);
              }
            } catch (IOException ie) {
              LOG.info("Exception writing to mirror " + mirrorNode
                       + "\n" + StringUtils.stringifyException(ie));
              try {
                out2.close();
                in2.close();
              } catch (IOException ie2) {
                // NOTHING
              } finally {
                out2 = null;
                in2 = null;
              }
            }
          }
          if (out2 == null) {
            LOG.info("Received block " + b + " from " +
                     s.getInetAddress());
          } else {
            LOG.info("Received block " + b + " from " +
                     s.getInetAddress() +
                     " and mirrored to " + mirrorTarget);
          }
        } finally {
          try {
            out.close();
          } catch (IOException iex) {
            checkDiskError(iex);
            throw iex;
          }
        }
        data.finalizeBlock(b);
        myMetrics.wroteBlocks(1);

        //
        // Tell the namenode that we've received this block
        // in full, if we've been asked to.  This is done
        // during NameNode-directed block transfers, but not
        // client writes.
        //
        if (shouldReportBlock) {
          synchronized (receivedBlockList) {
            receivedBlockList.add(b);
            receivedBlockList.notifyAll();
          }
        }

        //
        // Tell client job is done, and reply with
        // the new LocatedBlock.
        //
        reply.writeLong(WRITE_COMPLETE);
        mirrors.add(curTarget);
        LocatedBlock newLB = new LocatedBlock(b, mirrors.toArray(new
DatanodeInfo[mirrors.size()]));
        newLB.write(reply);
      } finally {
        reply.close();
      }
    }
  }

Reply via email to