The way that I fix errors on non-ecc memory machines is by throwing them in
the trash.
The benefit of ecc comes at such a low cost that it is very hard to justify
screwing around with methods to compensate in software.
On 8/14/07 7:35 AM, "Dennis Kubes" <[EMAIL PROTECTED]> wrote:
> How does this fix the non-ecc memory errors?
>
> Dennis Kubes
>
> Daeseong Kim wrote:
>> To solve the checksum errors on the non-ecc memory machines, I
>> modified some codes in DFSClient.java and DataNode.java.
>>
>> The idea is very simple.
>> The original CHUNK structure is
>> {chunk size}{chunk data}{chunk size}{chunk data}...
>>
>> The modified CHUNK structure is
>> {chunk size}{chunk data}{chunk crc}{chunk size}{chunk data}{chunk crc}...
>>
>> Here is codes.
>> ------------------------
>> DFSClient.java
>>
>> import java.util.zip.*;
>>
>> private synchronized void endBlock() throws IOException {
>> long sleeptime = 400;
>> //
>> // Done with local copy
>> //
>> closeBackupStream();
>> Checksum sum = new CRC32();
>>
>> //
>> // Send it to datanode
>> //
>> boolean sentOk = false;
>> int remainingAttempts =
>> conf.getInt("dfs.client.block.write.retries", 3);
>> while (!sentOk) {
>> nextBlockOutputStream();
>> InputStream in = new FileInputStream(backupFile);
>> try {
>> byte buf[] = new byte[BUFFER_SIZE];
>> int bytesRead = in.read(buf);
>> while (bytesRead > 0) {
>>
>> boolean checked = false;
>> while (!checked) {
>> blockStream.writeLong((long) bytesRead);
>> blockStream.write(buf, 0, bytesRead);
>>
>> // here we will send crc data
>> sum.reset();
>> sum.update(buf, 0, bytesRead);
>> int crc = (int) sum.getValue();
>> blockStream.writeInt(crc);
>> blockStream.flush();
>>
>> byte re = (byte) blockReplyStream.read();
>> if (re == 0x00) checked = true;
>> }
>>
>> if (progress != null) { progress.progress(); }
>> bytesRead = in.read(buf);
>> }
>> internalClose();
>> sentOk = true;
>> } catch (IOException ie) {
>> handleSocketException(ie);
>> remainingAttempts -= 1;
>> if (remainingAttempts == 0) {
>> throw ie;
>> }
>> try {
>> Thread.sleep(sleeptime);
>> } catch (InterruptedException e) {
>> }
>> } finally {
>> in.close();
>> }
>> }
>>
>> bytesWrittenToBlock = 0;
>> //
>> // Delete local backup, start new one
>> //
>> deleteBackupFile();
>> File tmpFile = newBackupFile();
>> bytesWrittenToBlock = 0;
>> backupStream = new FileOutputStream(tmpFile);
>> backupFile = tmpFile;
>> }
>>
>>
>> DataNode.java
>>
>> import java.util.zip.*;
>>
>> private void writeBlock(DataInputStream in) throws IOException {
>> //
>> // Read in the header
>> //
>> DataOutputStream reply =
>> new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
>> try {
>> boolean shouldReportBlock = in.readBoolean();
>> Block b = new Block();
>> b.readFields(in);
>> int numTargets = in.readInt();
>> if (numTargets <= 0) {
>> throw new IOException("Mislabelled incoming datastream.");
>> }
>> DatanodeInfo targets[] = new DatanodeInfo[numTargets];
>> for (int i = 0; i < targets.length; i++) {
>> DatanodeInfo tmp = new DatanodeInfo();
>> tmp.readFields(in);
>> targets[i] = tmp;
>> }
>> byte encodingType = (byte) in.read();
>> long len = in.readLong();
>>
>> //
>> // Make sure curTarget is equal to this machine
>> //
>> DatanodeInfo curTarget = targets[0];
>>
>> //
>> // Track all the places we've successfully written the block
>> //
>> Vector<DatanodeInfo> mirrors = new Vector<DatanodeInfo>();
>>
>> //
>> // Open local disk out
>> //
>> OutputStream o;
>> try {
>> o = data.writeToBlock(b);
>> } catch( IOException e ) {
>> checkDiskError( e );
>> throw e;
>> }
>> DataOutputStream out = new DataOutputStream(new
>> BufferedOutputStream(o));
>> InetSocketAddress mirrorTarget = null;
>> String mirrorNode = null;
>> try {
>> //
>> // Open network conn to backup machine, if
>> // appropriate
>> //
>> DataInputStream in2 = null;
>> DataOutputStream out2 = null;
>> if (targets.length > 1) {
>> // Connect to backup machine
>> mirrorNode = targets[1].getName();
>> mirrorTarget = createSocketAddr(mirrorNode);
>> try {
>> Socket s2 = new Socket();
>> s2.connect(mirrorTarget, READ_TIMEOUT);
>> s2.setSoTimeout(READ_TIMEOUT);
>> out2 = new DataOutputStream(new
>> BufferedOutputStream(s2.getOutputStream()));
>> in2 = new DataInputStream(new
>> BufferedInputStream(s2.getInputStream()));
>>
>> // Write connection header
>> out2.write(OP_WRITE_BLOCK);
>> out2.writeBoolean(shouldReportBlock);
>> b.write(out2);
>> out2.writeInt(targets.length - 1);
>> for (int i = 1; i < targets.length; i++) {
>> targets[i].write(out2);
>> }
>> out2.write(encodingType);
>> out2.writeLong(len);
>> myMetrics.replicatedBlocks(1);
>> } catch (IOException ie) {
>> if (out2 != null) {
>> LOG.info("Exception connecting to mirror " + mirrorNode
>> + "\n" + StringUtils.stringifyException(ie));
>> try {
>> out2.close();
>> in2.close();
>> } catch (IOException out2close) {
>> } finally {
>> out2 = null;
>> in2 = null;
>> }
>> }
>> }
>> }
>>
>> //
>> // Process incoming data, copy to disk and
>> // maybe to network. First copy to the network before
>> // writing to local disk so that all datanodes might
>> // write to local disk in parallel.
>> //
>> boolean anotherChunk = len != 0;
>> byte buf[] = new byte[BUFFER_SIZE];
>>
>> Checksum sum = new CRC32();
>>
>> while (anotherChunk) {
>>
>> if (encodingType == CHUNKED_ENCODING)
>> {
>> // read fully
>> int pos = 0;
>> int remain = (int) len;
>> while (remain > 0) {
>> int bytesRead = in.read(buf, pos, remain);
>> if (bytesRead < 0) {
>> throw new EOFException("EOF reading from "+s.toString());
>> }
>> pos += bytesRead;
>> remain -= bytesRead;
>> }
>>
>> // read crc
>> int crc = in.readInt();
>> sum.reset();
>> sum.update(buf, 0, (int)len);
>> int res = (int) sum.getValue();
>>
>> if (crc == res) {
>> reply.write(0x00);
>> reply.flush();
>>
>> if (out2 != null) {
>> try {
>> out2.write(buf, 0, (int)len);
>> out2.writeInt(crc);
>> out2.flush();
>> byte re = (byte) in2.read();
>> if (re != 0x00) throw new IOException("fail to copy
>> data to replica");
>> } catch (IOException out2e) {
>> LOG.info("Exception writing to mirror " + mirrorNode
>> + "\n" + StringUtils.stringifyException(out2e));
>> //
>> // If stream-copy fails, continue
>> // writing to disk. We shouldn't
>> // interrupt client write.
>> //
>> try {
>> out2.close();
>> in2.close();
>> } catch (IOException out2close) {
>> } finally {
>> out2 = null;
>> in2 = null;
>> }
>> }
>> }
>> try {
>> out.write(buf, 0, (int)len);
>> myMetrics.wroteBytes((int)len);
>> } catch (IOException iex) {
>> checkDiskError(iex);
>> throw iex;
>> }
>>
>> len = in.readLong();
>> if (out2 != null) {
>> try {
>> out2.writeLong(len);
>> } catch (IOException ie) {
>> LOG.info("Exception writing to mirror " + mirrorNode
>> + "\n" + StringUtils.stringifyException(ie));
>> try {
>> out2.close();
>> in2.close();
>> } catch (IOException ie2) {
>> // NOTHING
>> } finally {
>> out2 = null;
>> in2 = null;
>> }
>> }
>> }
>>
>> } else {
>> reply.write(0x01);
>> reply.flush();
>> len = in.readLong();
>> }
>>
>> if (len == 0) {
>> anotherChunk = false;
>> }
>> } else if (encodingType == RUNLENGTH_ENCODING) {
>>
>> while (len > 0) {
>> int bytesRead = in.read(buf, 0, (int)Math.min(buf.length, len));
>> if (bytesRead < 0) {
>> throw new EOFException("EOF reading from "+s.toString());
>> }
>> if (bytesRead > 0) {
>> if (out2 != null) {
>> try {
>> out2.write(buf, 0, bytesRead);
>> } catch (IOException out2e) {
>> LOG.info("Exception writing to mirror " + mirrorNode
>> + "\n" + StringUtils.stringifyException(out2e));
>> //
>> // If stream-copy fails, continue
>> // writing to disk. We shouldn't
>> // interrupt client write.
>> //
>> try {
>> out2.close();
>> in2.close();
>> } catch (IOException out2close) {
>> } finally {
>> out2 = null;
>> in2 = null;
>> }
>> }
>> }
>> try {
>> out.write(buf, 0, bytesRead);
>> myMetrics.wroteBytes(bytesRead);
>> } catch (IOException iex) {
>> checkDiskError(iex);
>> throw iex;
>> }
>> len -= bytesRead;
>> }
>> }
>>
>> anotherChunk = false;
>> }
>> }
>>
>> if (out2 != null) {
>> try {
>> out2.flush();
>> long complete = in2.readLong();
>> if (complete != WRITE_COMPLETE) {
>> LOG.info("Conflicting value for WRITE_COMPLETE: " +
>> complete);
>> }
>> LocatedBlock newLB = new LocatedBlock();
>> newLB.readFields(in2);
>> in2.close();
>> out2.close();
>> DatanodeInfo mirrorsSoFar[] = newLB.getLocations();
>> for (int k = 0; k < mirrorsSoFar.length; k++) {
>> mirrors.add(mirrorsSoFar[k]);
>> }
>> } catch (IOException ie) {
>> LOG.info("Exception writing to mirror " + mirrorNode
>> + "\n" + StringUtils.stringifyException(ie));
>> try {
>> out2.close();
>> in2.close();
>> } catch (IOException ie2) {
>> // NOTHING
>> } finally {
>> out2 = null;
>> in2 = null;
>> }
>> }
>> }
>> if (out2 == null) {
>> LOG.info("Received block " + b + " from " +
>> s.getInetAddress());
>> } else {
>> LOG.info("Received block " + b + " from " +
>> s.getInetAddress() +
>> " and mirrored to " + mirrorTarget);
>> }
>> } finally {
>> try {
>> out.close();
>> } catch (IOException iex) {
>> checkDiskError(iex);
>> throw iex;
>> }
>> }
>> data.finalizeBlock(b);
>> myMetrics.wroteBlocks(1);
>>
>> //
>> // Tell the namenode that we've received this block
>> // in full, if we've been asked to. This is done
>> // during NameNode-directed block transfers, but not
>> // client writes.
>> //
>> if (shouldReportBlock) {
>> synchronized (receivedBlockList) {
>> receivedBlockList.add(b);
>> receivedBlockList.notifyAll();
>> }
>> }
>>
>> //
>> // Tell client job is done, and reply with
>> // the new LocatedBlock.
>> //
>> reply.writeLong(WRITE_COMPLETE);
>> mirrors.add(curTarget);
>> LocatedBlock newLB = new LocatedBlock(b, mirrors.toArray(new
>> DatanodeInfo[mirrors.size()]));
>> newLB.write(reply);
>> } finally {
>> reply.close();
>> }
>> }
>> }