How does this fix the non-ecc memory errors?
Dennis Kubes
Daeseong Kim wrote:
To solve the checksum errors on the non-ecc memory machines, I
modified some codes in DFSClient.java and DataNode.java.
The idea is very simple.
The original CHUNK structure is
{chunk size}{chunk data}{chunk size}{chunk data}...
The modified CHUNK structure is
{chunk size}{chunk data}{chunk crc}{chunk size}{chunk data}{chunk crc}...
Here is codes.
------------------------
DFSClient.java
import java.util.zip.*;
private synchronized void endBlock() throws IOException {
long sleeptime = 400;
//
// Done with local copy
//
closeBackupStream();
Checksum sum = new CRC32();
//
// Send it to datanode
//
boolean sentOk = false;
int remainingAttempts =
conf.getInt("dfs.client.block.write.retries", 3);
while (!sentOk) {
nextBlockOutputStream();
InputStream in = new FileInputStream(backupFile);
try {
byte buf[] = new byte[BUFFER_SIZE];
int bytesRead = in.read(buf);
while (bytesRead > 0) {
boolean checked = false;
while (!checked) {
blockStream.writeLong((long) bytesRead);
blockStream.write(buf, 0, bytesRead);
// here we will send crc data
sum.reset();
sum.update(buf, 0, bytesRead);
int crc = (int) sum.getValue();
blockStream.writeInt(crc);
blockStream.flush();
byte re = (byte) blockReplyStream.read();
if (re == 0x00) checked = true;
}
if (progress != null) { progress.progress(); }
bytesRead = in.read(buf);
}
internalClose();
sentOk = true;
} catch (IOException ie) {
handleSocketException(ie);
remainingAttempts -= 1;
if (remainingAttempts == 0) {
throw ie;
}
try {
Thread.sleep(sleeptime);
} catch (InterruptedException e) {
}
} finally {
in.close();
}
}
bytesWrittenToBlock = 0;
//
// Delete local backup, start new one
//
deleteBackupFile();
File tmpFile = newBackupFile();
bytesWrittenToBlock = 0;
backupStream = new FileOutputStream(tmpFile);
backupFile = tmpFile;
}
DataNode.java
import java.util.zip.*;
private void writeBlock(DataInputStream in) throws IOException {
//
// Read in the header
//
DataOutputStream reply =
new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
try {
boolean shouldReportBlock = in.readBoolean();
Block b = new Block();
b.readFields(in);
int numTargets = in.readInt();
if (numTargets <= 0) {
throw new IOException("Mislabelled incoming datastream.");
}
DatanodeInfo targets[] = new DatanodeInfo[numTargets];
for (int i = 0; i < targets.length; i++) {
DatanodeInfo tmp = new DatanodeInfo();
tmp.readFields(in);
targets[i] = tmp;
}
byte encodingType = (byte) in.read();
long len = in.readLong();
//
// Make sure curTarget is equal to this machine
//
DatanodeInfo curTarget = targets[0];
//
// Track all the places we've successfully written the block
//
Vector<DatanodeInfo> mirrors = new Vector<DatanodeInfo>();
//
// Open local disk out
//
OutputStream o;
try {
o = data.writeToBlock(b);
} catch( IOException e ) {
checkDiskError( e );
throw e;
}
DataOutputStream out = new DataOutputStream(new
BufferedOutputStream(o));
InetSocketAddress mirrorTarget = null;
String mirrorNode = null;
try {
//
// Open network conn to backup machine, if
// appropriate
//
DataInputStream in2 = null;
DataOutputStream out2 = null;
if (targets.length > 1) {
// Connect to backup machine
mirrorNode = targets[1].getName();
mirrorTarget = createSocketAddr(mirrorNode);
try {
Socket s2 = new Socket();
s2.connect(mirrorTarget, READ_TIMEOUT);
s2.setSoTimeout(READ_TIMEOUT);
out2 = new DataOutputStream(new
BufferedOutputStream(s2.getOutputStream()));
in2 = new DataInputStream(new
BufferedInputStream(s2.getInputStream()));
// Write connection header
out2.write(OP_WRITE_BLOCK);
out2.writeBoolean(shouldReportBlock);
b.write(out2);
out2.writeInt(targets.length - 1);
for (int i = 1; i < targets.length; i++) {
targets[i].write(out2);
}
out2.write(encodingType);
out2.writeLong(len);
myMetrics.replicatedBlocks(1);
} catch (IOException ie) {
if (out2 != null) {
LOG.info("Exception connecting to mirror " + mirrorNode
+ "\n" + StringUtils.stringifyException(ie));
try {
out2.close();
in2.close();
} catch (IOException out2close) {
} finally {
out2 = null;
in2 = null;
}
}
}
}
//
// Process incoming data, copy to disk and
// maybe to network. First copy to the network before
// writing to local disk so that all datanodes might
// write to local disk in parallel.
//
boolean anotherChunk = len != 0;
byte buf[] = new byte[BUFFER_SIZE];
Checksum sum = new CRC32();
while (anotherChunk) {
if (encodingType == CHUNKED_ENCODING)
{
// read fully
int pos = 0;
int remain = (int) len;
while (remain > 0) {
int bytesRead = in.read(buf, pos, remain);
if (bytesRead < 0) {
throw new EOFException("EOF reading from
"+s.toString());
}
pos += bytesRead;
remain -= bytesRead;
}
// read crc
int crc = in.readInt();
sum.reset();
sum.update(buf, 0, (int)len);
int res = (int) sum.getValue();
if (crc == res) {
reply.write(0x00);
reply.flush();
if (out2 != null) {
try {
out2.write(buf, 0, (int)len);
out2.writeInt(crc);
out2.flush();
byte re = (byte) in2.read();
if (re != 0x00) throw new
IOException("fail to copy
data to replica");
} catch (IOException out2e) {
LOG.info("Exception writing to mirror " + mirrorNode
+ "\n" + StringUtils.stringifyException(out2e));
//
// If stream-copy fails, continue
// writing to disk. We shouldn't
// interrupt client write.
//
try {
out2.close();
in2.close();
} catch (IOException out2close) {
} finally {
out2 = null;
in2 = null;
}
}
}
try {
out.write(buf, 0, (int)len);
myMetrics.wroteBytes((int)len);
} catch (IOException iex) {
checkDiskError(iex);
throw iex;
}
len = in.readLong();
if (out2 != null) {
try {
out2.writeLong(len);
} catch (IOException ie) {
LOG.info("Exception writing to mirror " + mirrorNode
+ "\n" + StringUtils.stringifyException(ie));
try {
out2.close();
in2.close();
} catch (IOException ie2) {
// NOTHING
} finally {
out2 = null;
in2 = null;
}
}
}
} else {
reply.write(0x01);
reply.flush();
len = in.readLong();
}
if (len == 0) {
anotherChunk = false;
}
} else if (encodingType == RUNLENGTH_ENCODING) {
while (len > 0) {
int bytesRead = in.read(buf, 0, (int)Math.min(buf.length,
len));
if (bytesRead < 0) {
throw new EOFException("EOF reading from
"+s.toString());
}
if (bytesRead > 0) {
if (out2 != null) {
try {
out2.write(buf, 0, bytesRead);
} catch (IOException out2e) {
LOG.info("Exception writing to mirror " + mirrorNode
+ "\n" +
StringUtils.stringifyException(out2e));
//
// If stream-copy fails, continue
// writing to disk. We shouldn't
// interrupt client write.
//
try {
out2.close();
in2.close();
} catch (IOException out2close) {
} finally {
out2 = null;
in2 = null;
}
}
}
try {
out.write(buf, 0, bytesRead);
myMetrics.wroteBytes(bytesRead);
} catch (IOException iex) {
checkDiskError(iex);
throw iex;
}
len -= bytesRead;
}
}
anotherChunk = false;
}
}
if (out2 != null) {
try {
out2.flush();
long complete = in2.readLong();
if (complete != WRITE_COMPLETE) {
LOG.info("Conflicting value for WRITE_COMPLETE: " + complete);
}
LocatedBlock newLB = new LocatedBlock();
newLB.readFields(in2);
in2.close();
out2.close();
DatanodeInfo mirrorsSoFar[] = newLB.getLocations();
for (int k = 0; k < mirrorsSoFar.length; k++) {
mirrors.add(mirrorsSoFar[k]);
}
} catch (IOException ie) {
LOG.info("Exception writing to mirror " + mirrorNode
+ "\n" + StringUtils.stringifyException(ie));
try {
out2.close();
in2.close();
} catch (IOException ie2) {
// NOTHING
} finally {
out2 = null;
in2 = null;
}
}
}
if (out2 == null) {
LOG.info("Received block " + b + " from " +
s.getInetAddress());
} else {
LOG.info("Received block " + b + " from " +
s.getInetAddress() +
" and mirrored to " + mirrorTarget);
}
} finally {
try {
out.close();
} catch (IOException iex) {
checkDiskError(iex);
throw iex;
}
}
data.finalizeBlock(b);
myMetrics.wroteBlocks(1);
//
// Tell the namenode that we've received this block
// in full, if we've been asked to. This is done
// during NameNode-directed block transfers, but not
// client writes.
//
if (shouldReportBlock) {
synchronized (receivedBlockList) {
receivedBlockList.add(b);
receivedBlockList.notifyAll();
}
}
//
// Tell client job is done, and reply with
// the new LocatedBlock.
//
reply.writeLong(WRITE_COMPLETE);
mirrors.add(curTarget);
LocatedBlock newLB = new LocatedBlock(b, mirrors.toArray(new
DatanodeInfo[mirrors.size()]));
newLB.write(reply);
} finally {
reply.close();
}
}
}