The way that I fix errors on non-ecc memory machines is by throwing them in
the trash.

The benefit of ecc comes at such a low cost that it is very hard to justify
screwing around with methods to compensate in software.


On 8/14/07 7:35 AM, "Dennis Kubes" <[EMAIL PROTECTED]> wrote:

> How does this fix the non-ecc memory errors?
> 
> Dennis Kubes
> 
> Daeseong Kim wrote:
>> To solve the checksum errors on the non-ecc memory machines, I
>> modified some codes in DFSClient.java and DataNode.java.
>> 
>> The idea is very simple.
>> The original CHUNK structure is
>> {chunk size}{chunk data}{chunk size}{chunk data}...
>> 
>> The modified CHUNK structure is
>> {chunk size}{chunk data}{chunk crc}{chunk size}{chunk data}{chunk crc}...
>> 
>> Here is codes.
>> ------------------------
>> DFSClient.java
>> 
>> import java.util.zip.*;
>> 
>> private synchronized void endBlock() throws IOException {
>>       long sleeptime = 400;
>>       //
>>       // Done with local copy
>>       //
>>       closeBackupStream();
>>       Checksum sum = new CRC32();
>> 
>>       //
>>       // Send it to datanode
>>       //
>>       boolean sentOk = false;
>>       int remainingAttempts =
>>         conf.getInt("dfs.client.block.write.retries", 3);
>>       while (!sentOk) {
>>         nextBlockOutputStream();
>>         InputStream in = new FileInputStream(backupFile);
>>         try {
>>           byte buf[] = new byte[BUFFER_SIZE];
>>           int bytesRead = in.read(buf);
>>           while (bytesRead > 0) {
>> 
>> boolean checked = false;
>> while (!checked) {    
>> blockStream.writeLong((long) bytesRead);
>> blockStream.write(buf, 0, bytesRead);
>> 
>> // here we will send crc data
>> sum.reset();
>> sum.update(buf, 0, bytesRead);
>> int crc = (int) sum.getValue();
>> blockStream.writeInt(crc);
>> blockStream.flush();
>> 
>> byte re = (byte) blockReplyStream.read();
>> if (re == 0x00) checked = true;
>>             }
>> 
>>             if (progress != null) { progress.progress(); }
>>             bytesRead = in.read(buf);
>>           }
>>           internalClose();
>>           sentOk = true;
>>         } catch (IOException ie) {
>>           handleSocketException(ie);
>>           remainingAttempts -= 1;
>>           if (remainingAttempts == 0) {
>>             throw ie;
>>           }
>>           try {
>>             Thread.sleep(sleeptime);
>>           } catch (InterruptedException e) {
>>           }
>>         } finally {
>>           in.close();
>>         }
>>       }
>> 
>>       bytesWrittenToBlock = 0;
>>       //
>>       // Delete local backup, start new one
>>       //
>>       deleteBackupFile();
>>       File tmpFile = newBackupFile();
>>       bytesWrittenToBlock = 0;
>>       backupStream = new FileOutputStream(tmpFile);
>>       backupFile = tmpFile;
>>     }
>> 
>> 
>> DataNode.java
>> 
>> import java.util.zip.*;
>> 
>> private void writeBlock(DataInputStream in) throws IOException {
>>       //
>>       // Read in the header
>>       //
>>       DataOutputStream reply =
>>         new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
>>       try {
>>         boolean shouldReportBlock = in.readBoolean();
>>         Block b = new Block();
>>         b.readFields(in);
>>         int numTargets = in.readInt();
>>         if (numTargets <= 0) {
>>           throw new IOException("Mislabelled incoming datastream.");
>>         }
>>         DatanodeInfo targets[] = new DatanodeInfo[numTargets];
>>         for (int i = 0; i < targets.length; i++) {
>>           DatanodeInfo tmp = new DatanodeInfo();
>>           tmp.readFields(in);
>>           targets[i] = tmp;
>>         }
>>         byte encodingType = (byte) in.read();
>>         long len = in.readLong();
>> 
>>         //
>>         // Make sure curTarget is equal to this machine
>>         //
>>         DatanodeInfo curTarget = targets[0];
>> 
>>         //
>>         // Track all the places we've successfully written the block
>>         //
>>         Vector<DatanodeInfo> mirrors = new Vector<DatanodeInfo>();
>> 
>>         //
>>         // Open local disk out
>>         //
>>         OutputStream o;
>>         try {
>>           o = data.writeToBlock(b);
>>         } catch( IOException e ) {
>>           checkDiskError( e );
>>           throw e;
>>         }
>>         DataOutputStream out = new DataOutputStream(new
>> BufferedOutputStream(o));
>>         InetSocketAddress mirrorTarget = null;
>>         String mirrorNode = null;
>>         try {
>>           //
>>           // Open network conn to backup machine, if
>>           // appropriate
>>           //
>>           DataInputStream in2 = null;
>>           DataOutputStream out2 = null;
>>           if (targets.length > 1) {
>>             // Connect to backup machine
>>             mirrorNode = targets[1].getName();
>>             mirrorTarget = createSocketAddr(mirrorNode);
>>             try {
>>               Socket s2 = new Socket();
>>               s2.connect(mirrorTarget, READ_TIMEOUT);
>>               s2.setSoTimeout(READ_TIMEOUT);
>>               out2 = new DataOutputStream(new
>> BufferedOutputStream(s2.getOutputStream()));
>>               in2 = new DataInputStream(new
>> BufferedInputStream(s2.getInputStream()));
>> 
>>               // Write connection header
>>               out2.write(OP_WRITE_BLOCK);
>>               out2.writeBoolean(shouldReportBlock);
>>               b.write(out2);
>>               out2.writeInt(targets.length - 1);
>>               for (int i = 1; i < targets.length; i++) {
>>                 targets[i].write(out2);
>>               }
>>               out2.write(encodingType);
>>               out2.writeLong(len);
>>               myMetrics.replicatedBlocks(1);
>>             } catch (IOException ie) {
>>               if (out2 != null) {
>>                 LOG.info("Exception connecting to mirror " + mirrorNode
>>                          + "\n" + StringUtils.stringifyException(ie));
>>                 try {
>>                   out2.close();
>>                   in2.close();
>>                 } catch (IOException out2close) {
>>                 } finally {
>>                   out2 = null;
>>                   in2 = null;
>>                 }
>>               }
>>             }
>>           }
>> 
>>           //
>>           // Process incoming data, copy to disk and
>>           // maybe to network. First copy to the network before
>>           // writing to local disk so that all datanodes might
>>           // write to local disk in parallel.
>>           //
>>           boolean anotherChunk = len != 0;
>>           byte buf[] = new byte[BUFFER_SIZE];
>> 
>>           Checksum sum = new CRC32();
>> 
>>           while (anotherChunk) {
>> 
>> if (encodingType == CHUNKED_ENCODING)
>> {
>> // read fully
>> int pos = 0;
>> int remain = (int) len;
>> while (remain > 0) {
>> int bytesRead = in.read(buf, pos, remain);
>> if (bytesRead < 0) {
>> throw new EOFException("EOF reading from "+s.toString());
>> }
>> pos += bytesRead;
>> remain -= bytesRead;
>> }
>> 
>> // read crc
>> int crc = in.readInt();
>> sum.reset();
>> sum.update(buf, 0, (int)len);
>> int res = (int) sum.getValue();
>> 
>> if (crc == res) {
>> reply.write(0x00);
>> reply.flush();
>> 
>> if (out2 != null) {
>> try {
>> out2.write(buf, 0, (int)len);
>> out2.writeInt(crc);
>> out2.flush();
>> byte re = (byte) in2.read();
>> if (re != 0x00) throw new IOException("fail to copy
>> data to replica");
>>                   } catch (IOException out2e) {
>>                     LOG.info("Exception writing to mirror " + mirrorNode
>>                              + "\n" + StringUtils.stringifyException(out2e));
>>                     //
>>                     // If stream-copy fails, continue
>>                     // writing to disk.  We shouldn't
>>                     // interrupt client write.
>>                     //
>>                     try {
>>                       out2.close();
>>                       in2.close();
>>                     } catch (IOException out2close) {
>>                     } finally {
>>                       out2 = null;
>>                       in2 = null;
>>                     }
>>                   }
>>                 }
>>                 try {
>>                   out.write(buf, 0, (int)len);
>>                   myMetrics.wroteBytes((int)len);
>>                 } catch (IOException iex) {
>>                   checkDiskError(iex);
>>                   throw iex;
>>                 }
>> 
>>                 len = in.readLong();
>>              if (out2 != null) {
>>                try {
>>                  out2.writeLong(len);
>>                } catch (IOException ie) {
>>                  LOG.info("Exception writing to mirror " + mirrorNode
>>                           + "\n" + StringUtils.stringifyException(ie));
>>                  try {
>>                    out2.close();
>>                    in2.close();
>>                  } catch (IOException ie2) {
>>                    // NOTHING
>>                  } finally {
>>                    out2 = null;
>>                    in2 = null;
>>                  }
>>                }
>>              }
>> 
>> } else {
>> reply.write(0x01);
>> reply.flush();
>> len = in.readLong();
>> }
>> 
>> if (len == 0) {
>>                 anotherChunk = false;
>>               }
>>          } else if (encodingType == RUNLENGTH_ENCODING) {
>> 
>> while (len > 0) {
>>              int bytesRead = in.read(buf, 0, (int)Math.min(buf.length, len));
>>              if (bytesRead < 0) {
>>                throw new EOFException("EOF reading from "+s.toString());
>>              }
>>              if (bytesRead > 0) {
>>                if (out2 != null) {
>>                  try {
>>                    out2.write(buf, 0, bytesRead);
>>                  } catch (IOException out2e) {
>>                    LOG.info("Exception writing to mirror " + mirrorNode
>>                             + "\n" + StringUtils.stringifyException(out2e));
>>                    //
>>                    // If stream-copy fails, continue
>>                    // writing to disk.  We shouldn't
>>                    // interrupt client write.
>>                    //
>>                    try {
>>                      out2.close();
>>                      in2.close();
>>                    } catch (IOException out2close) {
>>                    } finally {
>>                      out2 = null;
>>                      in2 = null;
>>                    }
>>                  }
>>                }
>>                try {
>>                  out.write(buf, 0, bytesRead);
>>                  myMetrics.wroteBytes(bytesRead);
>>                } catch (IOException iex) {
>>                  checkDiskError(iex);
>>                  throw iex;
>>                }
>>                len -= bytesRead;
>>              }
>> }
>> 
>>               anotherChunk = false;
>>             }
>>           }
>> 
>>           if (out2 != null) {
>>             try {
>>               out2.flush();
>>               long complete = in2.readLong();
>>               if (complete != WRITE_COMPLETE) {
>>                 LOG.info("Conflicting value for WRITE_COMPLETE: " +
>> complete);
>>               }
>>               LocatedBlock newLB = new LocatedBlock();
>>               newLB.readFields(in2);
>>               in2.close();
>>               out2.close();
>>               DatanodeInfo mirrorsSoFar[] = newLB.getLocations();
>>               for (int k = 0; k < mirrorsSoFar.length; k++) {
>>                 mirrors.add(mirrorsSoFar[k]);
>>               }
>>             } catch (IOException ie) {
>>               LOG.info("Exception writing to mirror " + mirrorNode
>>                        + "\n" + StringUtils.stringifyException(ie));
>>               try {
>>                 out2.close();
>>                 in2.close();
>>               } catch (IOException ie2) {
>>                 // NOTHING
>>               } finally {
>>                 out2 = null;
>>                 in2 = null;
>>               }
>>             }
>>           }
>>           if (out2 == null) {
>>             LOG.info("Received block " + b + " from " +
>>                      s.getInetAddress());
>>           } else {
>>             LOG.info("Received block " + b + " from " +
>>                      s.getInetAddress() +
>>                      " and mirrored to " + mirrorTarget);
>>           }
>>         } finally {
>>           try {
>>             out.close();
>>           } catch (IOException iex) {
>>             checkDiskError(iex);
>>             throw iex;
>>           }
>>         }
>>         data.finalizeBlock(b);
>>         myMetrics.wroteBlocks(1);
>> 
>>         //
>>         // Tell the namenode that we've received this block
>>         // in full, if we've been asked to.  This is done
>>         // during NameNode-directed block transfers, but not
>>         // client writes.
>>         //
>>         if (shouldReportBlock) {
>>           synchronized (receivedBlockList) {
>>             receivedBlockList.add(b);
>>             receivedBlockList.notifyAll();
>>           }
>>         }
>> 
>>         //
>>         // Tell client job is done, and reply with
>>         // the new LocatedBlock.
>>         //
>>         reply.writeLong(WRITE_COMPLETE);
>>         mirrors.add(curTarget);
>>         LocatedBlock newLB = new LocatedBlock(b, mirrors.toArray(new
>> DatanodeInfo[mirrors.size()]));
>>         newLB.write(reply);
>>       } finally {
>>         reply.close();
>>       }
>>     }
>>   }

Reply via email to