Author: suresh
Date: Sun Feb 5 01:39:30 2012
New Revision: 1240653
URL: http://svn.apache.org/viewvc?rev=1240653&view=rev
Log:
HDFS-2880. Protobuf chagnes in DatanodeProtocol to add multiple storages.
Contributed by Suresh Srinivas.
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1240653&r1=1240652&r2=1240653&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sun Feb 5
01:39:30 2012
@@ -45,6 +45,9 @@ Trunk (unreleased changes)
HDFS-2697. Move RefreshAuthPolicy, RefreshUserMappings, GetUserMappings
protocol to protocol buffers. (jitendra)
+ HDFS-2880. Protobuf chagnes in DatanodeProtocol to add multiple storages.
+ (suresh)
+
IMPROVEMENTS
HADOOP-7524 Change RPC to allow multiple protocols including multuple
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java?rev=1240653&r1=1240652&r2=1240653&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
Sun Feb 5 01:39:30 2012
@@ -46,6 +46,9 @@ import org.apache.hadoop.hdfs.protocol.p
import
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeRequestProto;
import
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto;
import
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto;
+import
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
+import
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
+import
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -169,11 +172,16 @@ public class DatanodeProtocolClientSideT
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xmitsInProgress, int xceiverCount, int failedVolumes)
throws IOException {
- HeartbeatRequestProto req = HeartbeatRequestProto.newBuilder()
- .setRegistration(PBHelper.convert(registration)).setCapacity(capacity)
+ StorageReportProto report = StorageReportProto.newBuilder()
+ .setBlockPoolUsed(blockPoolUsed).setCapacity(capacity)
.setDfsUsed(dfsUsed).setRemaining(remaining)
- .setBlockPoolUsed(blockPoolUsed).setXmitsInProgress(xmitsInProgress)
- .setXceiverCount(xceiverCount).setFailedVolumes(failedVolumes).build();
+ .setStorageID(registration.getStorageID()).build();
+
+ HeartbeatRequestProto req = HeartbeatRequestProto.newBuilder()
+ .setRegistration(PBHelper.convert(registration)).addReports(report)
+ .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
+ .setFailedVolumes(failedVolumes)
+ .build();
HeartbeatResponseProto resp;
try {
resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, req);
@@ -192,15 +200,17 @@ public class DatanodeProtocolClientSideT
@Override
public DatanodeCommand blockReport(DatanodeRegistration registration,
String poolId, long[] blocks) throws IOException {
- BlockReportRequestProto.Builder builder = BlockReportRequestProto
- .newBuilder().setRegistration(PBHelper.convert(registration))
- .setBlockPoolId(poolId);
+ StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto
+ .newBuilder().setStorageID(registration.getStorageID());
+
if (blocks != null) {
for (int i = 0; i < blocks.length; i++) {
- builder.addBlocks(blocks[i]);
+ reportBuilder.addBlocks(blocks[i]);
}
}
- BlockReportRequestProto req = builder.build();
+ BlockReportRequestProto req = BlockReportRequestProto
+ .newBuilder().setRegistration(PBHelper.convert(registration))
+ .setBlockPoolId(poolId).addReports(reportBuilder.build()).build();
BlockReportResponseProto resp;
try {
resp = rpcProxy.blockReport(NULL_CONTROLLER, req);
@@ -211,19 +221,21 @@ public class DatanodeProtocolClientSideT
}
@Override
- public void blockReceivedAndDeleted(DatanodeRegistration registration,
+ public void blockReceivedAndDeleted(DatanodeRegistration reg,
String poolId, ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks)
throws IOException {
- BlockReceivedAndDeletedRequestProto.Builder builder =
- BlockReceivedAndDeletedRequestProto.newBuilder()
- .setRegistration(PBHelper.convert(registration))
- .setBlockPoolId(poolId);
+ StorageReceivedDeletedBlocksProto.Builder builder =
+ StorageReceivedDeletedBlocksProto.newBuilder()
+ .setStorageID(reg.getStorageID());
if (receivedAndDeletedBlocks != null) {
for (int i = 0; i < receivedAndDeletedBlocks.length; i++) {
builder.addBlocks(PBHelper.convert(receivedAndDeletedBlocks[i]));
}
}
- BlockReceivedAndDeletedRequestProto req = builder.build();
+ BlockReceivedAndDeletedRequestProto req =
+ BlockReceivedAndDeletedRequestProto.newBuilder()
+ .setRegistration(PBHelper.convert(reg))
+ .setBlockPoolId(poolId).addBlocks(builder.build()).build();
try {
rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, req);
} catch (ServiceException se) {
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java?rev=1240653&r1=1240652&r2=1240653&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
Sun Feb 5 01:39:30 2012
@@ -29,7 +29,6 @@ import org.apache.hadoop.hdfs.protocol.p
import
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
import
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto;
import
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationResponseProto;
-import
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
import
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto;
import
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportResponseProto;
import
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
@@ -41,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.p
import
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto;
import
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto;
import
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksResponseProto;
+import
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
@@ -98,9 +98,10 @@ public class DatanodeProtocolServerSideT
HeartbeatRequestProto request) throws ServiceException {
DatanodeCommand[] cmds = null;
try {
+ StorageReportProto report = request.getReports(0);
cmds = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
- request.getCapacity(), request.getDfsUsed(), request.getRemaining(),
- request.getBlockPoolUsed(), request.getXmitsInProgress(),
+ report.getCapacity(), report.getDfsUsed(), report.getRemaining(),
+ report.getBlockPoolUsed(), request.getXmitsInProgress(),
request.getXceiverCount(), request.getFailedVolumes());
} catch (IOException e) {
throw new ServiceException(e);
@@ -121,7 +122,7 @@ public class DatanodeProtocolServerSideT
public BlockReportResponseProto blockReport(RpcController controller,
BlockReportRequestProto request) throws ServiceException {
DatanodeCommand cmd = null;
- List<Long> blockIds = request.getBlocksList();
+ List<Long> blockIds = request.getReports(0).getBlocksList();
long[] blocks = new long[blockIds.size()];
for (int i = 0; i < blockIds.size(); i++) {
blocks[i] = blockIds.get(i);
@@ -144,7 +145,8 @@ public class DatanodeProtocolServerSideT
public BlockReceivedAndDeletedResponseProto blockReceivedAndDeleted(
RpcController controller, BlockReceivedAndDeletedRequestProto request)
throws ServiceException {
- List<ReceivedDeletedBlockInfoProto> rdbip = request.getBlocksList();
+ List<ReceivedDeletedBlockInfoProto> rdbip = request.getBlocks(0)
+ .getBlocksList();
ReceivedDeletedBlockInfo[] info =
new ReceivedDeletedBlockInfo[rdbip.size()];
for (int i = 0; i < rdbip.size(); i++) {
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto?rev=1240653&r1=1240652&r2=1240653&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
Sun Feb 5 01:39:30 2012
@@ -36,6 +36,19 @@ message DatanodeRegistrationProto {
}
/**
+ * Represents a storage available on the datanode
+ */
+message DatanodeStorageProto {
+ enum StorageState {
+ NORMAL = 0;
+ READ_ONLY = 1;
+ }
+
+ required string storageID = 1; // Unique identifier for the storage
+ optional StorageState state = 2 [default = NORMAL];
+}
+
+/**
* Commands sent from namenode to the datanodes
*/
message DatanodeCommandProto {
@@ -136,6 +149,7 @@ message UpgradeCommandProto {
*/
message RegisterDatanodeRequestProto {
required DatanodeRegistrationProto registration = 1; // Datanode info
+ repeated DatanodeStorageProto storages = 2; // Storages on the datanode
}
/**
@@ -159,13 +173,19 @@ message RegisterDatanodeResponseProto {
*/
message HeartbeatRequestProto {
required DatanodeRegistrationProto registration = 1; // Datanode info
- required uint64 capacity = 2;
- required uint64 dfsUsed = 3;
- required uint64 remaining = 4;
- required uint64 blockPoolUsed = 5;
- required uint32 xmitsInProgress = 6;
- required uint32 xceiverCount = 7;
- required uint32 failedVolumes = 8;
+ repeated StorageReportProto reports = 2;
+ optional uint32 xmitsInProgress = 3 [ default = 0 ];
+ optional uint32 xceiverCount = 4 [ default = 0 ];
+ optional uint32 failedVolumes = 5 [ default = 0 ];
+}
+
+message StorageReportProto {
+ required string storageID = 1;
+ optional bool failed = 2 [ default = false ];
+ optional uint64 capacity = 3 [ default = 0 ];
+ optional uint64 dfsUsed = 4 [ default = 0 ];
+ optional uint64 remaining = 5 [ default = 0 ];
+ optional uint64 blockPoolUsed = 6 [ default = 0 ];
}
/**
@@ -185,7 +205,15 @@ message HeartbeatResponseProto {
message BlockReportRequestProto {
required DatanodeRegistrationProto registration = 1;
required string blockPoolId = 2;
- repeated uint64 blocks = 3 [packed=true];
+ repeated StorageBlockReportProto reports = 3;
+}
+
+/**
+ * Report of blocks in a storage
+ */
+message StorageBlockReportProto {
+ required string storageID = 1; // Storage ID
+ repeated uint64 blocks = 2 [packed=true];
}
/**
@@ -208,6 +236,14 @@ message ReceivedDeletedBlockInfoProto {
}
/**
+ * List of blocks received and deleted for a storage.
+ */
+message StorageReceivedDeletedBlocksProto {
+ required string storageID = 1;
+ repeated ReceivedDeletedBlockInfoProto blocks = 2;
+}
+
+/**
* registration - datanode registration information
* blockPoolID - block pool ID of the reported blocks
* blocks - Received/deleted block list
@@ -215,7 +251,7 @@ message ReceivedDeletedBlockInfoProto {
message BlockReceivedAndDeletedRequestProto {
required DatanodeRegistrationProto registration = 1;
required string blockPoolId = 2;
- repeated ReceivedDeletedBlockInfoProto blocks = 3;
+ repeated StorageReceivedDeletedBlocksProto blocks = 3;
}
/**