Author: szetszwo
Date: Mon Jun 4 17:43:51 2012
New Revision: 1346050
URL: http://svn.apache.org/viewvc?rev=1346050&view=rev
Log:
svn merge -c 1344419 from trunk for HDFS-744. Support hsync in HDFS.
Added:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestHSync.java
- copied unchanged from r1344419,
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestHSync.java
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/ (props
changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
(props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1344419
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1346050&r1=1346049&r2=1346050&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Mon Jun 4 17:43:51 2012
@@ -6,6 +6,8 @@ Release 2.0.1-alpha - UNRELEASED
NEW FEATURES
+ HDFS-744. Support hsync in HDFS. (Lars Hofhansl via szetszwo)
+
IMPROVEMENTS
HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG
Propchange:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
Merged
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1344419
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1346050&r1=1346049&r2=1346050&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
Mon Jun 4 17:43:51 2012
@@ -129,11 +129,13 @@ public class DFSOutputStream extends FSO
private long initialFileSize = 0; // at time of file open
private Progressable progress;
private final short blockReplication; // replication factor of file
+ private boolean shouldSyncBlock = false; // force blocks to disk upon close
private class Packet {
long seqno; // sequencenumber of buffer in block
long offsetInBlock; // offset in block
- boolean lastPacketInBlock; // is this the last packet in block?
+ private boolean lastPacketInBlock; // is this the last packet in block?
+ boolean syncBlock; // this packet forces the current block to disk
int numChunks; // number of chunks currently in packet
int maxChunks; // max chunks in packet
@@ -245,7 +247,7 @@ public class DFSOutputStream extends FSO
buffer.mark();
PacketHeader header = new PacketHeader(
- pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen);
+ pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
header.putInBuffer(buffer);
buffer.reset();
@@ -1249,6 +1251,7 @@ public class DFSOutputStream extends FSO
long blockSize, Progressable progress, int buffersize,
DataChecksum checksum) throws IOException {
this(dfsClient, src, blockSize, progress, checksum, replication);
+ this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
computePacketChunkSize(dfsClient.getConf().writePacketSize,
checksum.getBytesPerChecksum());
@@ -1431,6 +1434,7 @@ public class DFSOutputStream extends FSO
currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0,
bytesCurBlock);
currentPacket.lastPacketInBlock = true;
+ currentPacket.syncBlock = shouldSyncBlock;
waitAndQueueCurrentPacket();
bytesCurBlock = 0;
lastFlushOffset = 0;
@@ -1456,6 +1460,24 @@ public class DFSOutputStream extends FSO
*/
@Override
public void hflush() throws IOException {
+ flushOrSync(false);
+ }
+
+ /**
+ * The expected semantics is all data have flushed out to all replicas
+ * and all replicas have done posix fsync equivalent - ie the OS has
+ * flushed it to the disk device (but the disk may have it in its cache).
+ *
+ * Note that only the current block is flushed to the disk device.
+ * To guarantee durable sync across block boundaries the stream should
+ * be created with {@link CreateFlag#SYNC_BLOCK}.
+ */
+ @Override
+ public void hsync() throws IOException {
+ flushOrSync(true);
+ }
+
+ private void flushOrSync(boolean isSync) throws IOException {
dfsClient.checkOpen();
isClosed();
try {
@@ -1483,7 +1505,13 @@ public class DFSOutputStream extends FSO
assert bytesCurBlock > lastFlushOffset;
// record the valid offset of this flush
lastFlushOffset = bytesCurBlock;
- waitAndQueueCurrentPacket();
+ if (isSync && currentPacket == null) {
+ // Nothing to send right now,
+ // but sync was requested.
+ // Send an empty packet
+ currentPacket = new Packet(packetSize, chunksPerPacket,
+ bytesCurBlock);
+ }
} else {
// We already flushed up to this offset.
// This means that we haven't written anything since the last flush
@@ -1493,8 +1521,21 @@ public class DFSOutputStream extends FSO
assert oldCurrentPacket == null :
"Empty flush should not occur with a currentPacket";
- // just discard the current packet since it is already been sent.
- currentPacket = null;
+ if (isSync && bytesCurBlock > 0) {
+ // Nothing to send right now,
+ // and the block was partially written,
+ // and sync was requested.
+ // So send an empty sync packet.
+ currentPacket = new Packet(packetSize, chunksPerPacket,
+ bytesCurBlock);
+ } else {
+ // just discard the current packet since it is already been sent.
+ currentPacket = null;
+ }
+ }
+ if (currentPacket != null) {
+ currentPacket.syncBlock = isSync;
+ waitAndQueueCurrentPacket();
}
// Restore state of stream. Record the last flush offset
// of the last full chunk that was flushed.
@@ -1546,18 +1587,6 @@ public class DFSOutputStream extends FSO
}
/**
- * The expected semantics is all data have flushed out to all replicas
- * and all replicas have done posix fsync equivalent - ie the OS has
- * flushed it to the disk device (but the disk may have it in its cache).
- *
- * Right now by default it is implemented as hflush
- */
- @Override
- public synchronized void hsync() throws IOException {
- hflush();
- }
-
- /**
* @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}.
*/
@Deprecated
@@ -1681,6 +1710,7 @@ public class DFSOutputStream extends FSO
currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0,
bytesCurBlock);
currentPacket.lastPacketInBlock = true;
+ currentPacket.syncBlock = shouldSyncBlock;
}
flushInternal(); // flush all data to Datanodes
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1346050&r1=1346049&r2=1346050&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
Mon Jun 4 17:43:51 2012
@@ -223,12 +223,19 @@ public class DistributedFileSystem exten
@Override
public HdfsDataOutputStream create(Path f, FsPermission permission,
- boolean overwrite, int bufferSize, short replication, long blockSize,
+ boolean overwrite, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+ return create(f, permission,
+ overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+ : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
+ blockSize, progress);
+ }
+
+ @Override
+ public HdfsDataOutputStream create(Path f, FsPermission permission,
+ EnumSet<CreateFlag> cflags, int bufferSize, short replication, long
blockSize,
Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
- final EnumSet<CreateFlag> cflags = overwrite?
- EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
- : EnumSet.of(CreateFlag.CREATE);
final DFSOutputStream out = dfs.create(getPathName(f), permission, cflags,
replication, blockSize, progress, bufferSize);
return new HdfsDataOutputStream(out, statistics);
@@ -249,6 +256,7 @@ public class DistributedFileSystem exten
/**
* Same as create(), except fails if parent directory doesn't already exist.
*/
+ @Override
public HdfsDataOutputStream createNonRecursive(Path f, FsPermission
permission,
EnumSet<CreateFlag> flag, int bufferSize, short replication,
long blockSize, Progressable progress) throws IOException {
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java?rev=1346050&r1=1346049&r2=1346050&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
Mon Jun 4 17:43:51 2012
@@ -40,6 +40,7 @@ public class PacketHeader {
.setSeqno(0)
.setLastPacketInBlock(false)
.setDataLen(0)
+ .setSyncBlock(false)
.build().getSerializedSize();
public static final int PKT_HEADER_LEN =
6 + PROTO_SIZE;
@@ -51,13 +52,14 @@ public class PacketHeader {
}
public PacketHeader(int packetLen, long offsetInBlock, long seqno,
- boolean lastPacketInBlock, int dataLen) {
+ boolean lastPacketInBlock, int dataLen, boolean
syncBlock) {
this.packetLen = packetLen;
proto = PacketHeaderProto.newBuilder()
.setOffsetInBlock(offsetInBlock)
.setSeqno(seqno)
.setLastPacketInBlock(lastPacketInBlock)
.setDataLen(dataLen)
+ .setSyncBlock(syncBlock)
.build();
}
@@ -81,6 +83,10 @@ public class PacketHeader {
return packetLen;
}
+ public boolean getSyncBlock() {
+ return proto.getSyncBlock();
+ }
+
@Override
public String toString() {
return "PacketHeader with packetLen=" + packetLen +
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1346050&r1=1346049&r2=1346050&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
Mon Jun 4 17:43:51 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.d
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -110,6 +111,8 @@ class BlockReceiver implements Closeable
private final BlockConstructionStage stage;
private final boolean isTransfer;
+ private boolean syncOnClose;
+
BlockReceiver(final ExtendedBlock block, final DataInputStream in,
final String inAddr, final String myAddr,
final BlockConstructionStage stage,
@@ -245,14 +248,18 @@ class BlockReceiver implements Closeable
* close files.
*/
public void close() throws IOException {
-
IOException ioe = null;
+ if (syncOnClose && (out != null || checksumOut != null)) {
+ datanode.metrics.incrFsyncCount();
+ }
// close checksum file
try {
if (checksumOut != null) {
checksumOut.flush();
- if (datanode.getDnConf().syncOnClose && (cout instanceof
FileOutputStream)) {
+ if (syncOnClose && (cout instanceof FileOutputStream)) {
+ long start = Util.now();
((FileOutputStream)cout).getChannel().force(true);
+ datanode.metrics.addFsync(Util.now() - start);
}
checksumOut.close();
checksumOut = null;
@@ -267,8 +274,10 @@ class BlockReceiver implements Closeable
try {
if (out != null) {
out.flush();
- if (datanode.getDnConf().syncOnClose && (out instanceof
FileOutputStream)) {
+ if (syncOnClose && (out instanceof FileOutputStream)) {
+ long start = Util.now();
((FileOutputStream)out).getChannel().force(true);
+ datanode.metrics.addFsync(Util.now() - start);
}
out.close();
out = null;
@@ -290,12 +299,25 @@ class BlockReceiver implements Closeable
* Flush block data and metadata files to disk.
* @throws IOException
*/
- void flush() throws IOException {
+ void flushOrSync(boolean isSync) throws IOException {
+ if (isSync && (out != null || checksumOut != null)) {
+ datanode.metrics.incrFsyncCount();
+ }
if (checksumOut != null) {
checksumOut.flush();
+ if (isSync && (cout instanceof FileOutputStream)) {
+ long start = Util.now();
+ ((FileOutputStream)cout).getChannel().force(true);
+ datanode.metrics.addFsync(Util.now() - start);
+ }
}
if (out != null) {
out.flush();
+ if (isSync && (out instanceof FileOutputStream)) {
+ long start = Util.now();
+ ((FileOutputStream)out).getChannel().force(true);
+ datanode.metrics.addFsync(Util.now() - start);
+ }
}
}
@@ -533,7 +555,9 @@ class BlockReceiver implements Closeable
header.getOffsetInBlock(),
header.getSeqno(),
header.isLastPacketInBlock(),
- header.getDataLen(), endOfHeader);
+ header.getDataLen(),
+ header.getSyncBlock(),
+ endOfHeader);
}
/**
@@ -549,15 +573,19 @@ class BlockReceiver implements Closeable
* returns the number of data bytes that the packet has.
*/
private int receivePacket(long offsetInBlock, long seqno,
- boolean lastPacketInBlock, int len, int endOfHeader) throws IOException {
+ boolean lastPacketInBlock, int len, boolean syncBlock,
+ int endOfHeader) throws IOException {
if (LOG.isDebugEnabled()){
LOG.debug("Receiving one packet for block " + block +
" of length " + len +
" seqno " + seqno +
" offsetInBlock " + offsetInBlock +
+ " syncBlock " + syncBlock +
" lastPacketInBlock " + lastPacketInBlock);
}
-
+ // make sure the block gets sync'ed upon close
+ this.syncOnClose |= syncBlock && lastPacketInBlock;
+
// update received bytes
long firstByteInBlock = offsetInBlock;
offsetInBlock += len;
@@ -587,6 +615,10 @@ class BlockReceiver implements Closeable
if(LOG.isDebugEnabled()) {
LOG.debug("Receiving an empty packet or the end of the block " +
block);
}
+ // flush unless close() would flush anyway
+ if (syncBlock && !lastPacketInBlock) {
+ flushOrSync(true);
+ }
} else {
int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
checksumSize;
@@ -677,8 +709,8 @@ class BlockReceiver implements Closeable
);
checksumOut.write(pktBuf, checksumOff, checksumLen);
}
- /// flush entire packet
- flush();
+ /// flush entire packet, sync unless close() will sync
+ flushOrSync(syncBlock && !lastPacketInBlock);
replicaInfo.setLastChecksumAndDataLen(
offsetInBlock, lastChunkChecksum
@@ -730,6 +762,7 @@ class BlockReceiver implements Closeable
String mirrAddr, DataTransferThrottler throttlerArg,
DatanodeInfo[] downstreams) throws IOException {
+ syncOnClose = datanode.getDnConf().syncOnClose;
boolean responderClosed = false;
mirrorOut = mirrOut;
mirrorAddr = mirrAddr;
@@ -768,7 +801,7 @@ class BlockReceiver implements Closeable
datanode.data.convertTemporaryToRbw(block);
} else {
// for isDatnode or TRANSFER_FINALIZED
- // Finalize the block. Does this fsync()?
+ // Finalize the block.
datanode.data.finalizeBlock(block);
}
datanode.metrics.incrBlocksWritten();
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1346050&r1=1346049&r2=1346050&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
Mon Jun 4 17:43:51 2012
@@ -701,8 +701,9 @@ class BlockSender implements java.io.Clo
*/
private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
pkt.clear();
+ // both syncBlock and syncPacket are false
PacketHeader header = new PacketHeader(packetLen, offset, seqno,
- (dataLen == 0), dataLen);
+ (dataLen == 0), dataLen, false);
header.putInBuffer(pkt);
}
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=1346050&r1=1346049&r2=1346050&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
Mon Jun 4 17:43:51 2012
@@ -61,6 +61,8 @@ public class DataNodeMetrics {
@Metric MutableCounterLong writesFromLocalClient;
@Metric MutableCounterLong writesFromRemoteClient;
@Metric MutableCounterLong blocksGetLocalPathInfo;
+
+ @Metric MutableCounterLong fsyncCount;
@Metric MutableCounterLong volumeFailures;
@@ -72,6 +74,8 @@ public class DataNodeMetrics {
@Metric MutableRate heartbeats;
@Metric MutableRate blockReports;
+ @Metric MutableRate fsync;
+
final MetricsRegistry registry = new MetricsRegistry("datanode");
final String name;
@@ -151,6 +155,14 @@ public class DataNodeMetrics {
blocksRead.incr();
}
+ public void incrFsyncCount() {
+ fsyncCount.incr();
+ }
+
+ public void addFsync(long latency) {
+ fsync.add(latency);
+ }
+
public void shutdown() {
DefaultMetricsSystem.shutdown();
}
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1346050&r1=1346049&r2=1346050&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
Mon Jun 4 17:43:51 2012
@@ -113,6 +113,7 @@ message PacketHeaderProto {
required sfixed64 seqno = 2;
required bool lastPacketInBlock = 3;
required sfixed32 dataLen = 4;
+ optional bool syncBlock = 5 [default = false];
}
enum Status {
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java?rev=1346050&r1=1346049&r2=1346050&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
Mon Jun 4 17:43:51 2012
@@ -139,7 +139,7 @@ public class AppendTestUtil {
/**
* create a buffer that contains the entire test file data.
*/
- static byte[] initBuffer(int size) {
+ public static byte[] initBuffer(int size) {
if (seed == -1)
seed = nextLong();
return randomBytes(seed, size);
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1346050&r1=1346049&r2=1346050&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
Mon Jun 4 17:43:51 2012
@@ -159,7 +159,8 @@ public class TestDataTransferProtocol ex
block.getNumBytes(), // OffsetInBlock
100, // sequencenumber
true, // lastPacketInBlock
- 0); // chunk length
+ 0, // chunk length
+ false); // sync block
hdr.write(sendOut);
sendOut.writeInt(0); // zero checksum
@@ -402,7 +403,8 @@ public class TestDataTransferProtocol ex
0, // offset in block,
100, // seqno
false, // last packet
- -1 - random.nextInt(oneMil)); // bad datalen
+ -1 - random.nextInt(oneMil), // bad datalen
+ false);
hdr.write(sendOut);
sendResponse(Status.SUCCESS, "", null, recvOut);
@@ -424,7 +426,8 @@ public class TestDataTransferProtocol ex
0, // OffsetInBlock
100, // sequencenumber
true, // lastPacketInBlock
- 0); // chunk length
+ 0, // chunk length
+ false);
hdr.write(sendOut);
sendOut.writeInt(0); // zero checksum
sendOut.flush();
@@ -508,8 +511,8 @@ public class TestDataTransferProtocol ex
1024, // OffsetInBlock
100, // sequencenumber
false, // lastPacketInBlock
- 4096); // chunk length
-
+ 4096, // chunk length
+ false);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
hdr.write(new DataOutputStream(baos));