This is an automated email from the ASF dual-hosted git repository. pepperjo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-crail.git
commit 7c4557df39a955dce9dde15762af95fee0eb6f7f Author: Malte Brodmann <mbrodm...@student.ethz.ch> AuthorDate: Fri Feb 5 15:37:24 2021 +0100 implement RPC to remove datanodes --- .../apache/crail/metadata/DataNodeStatistics.java | 14 ++++- .../java/org/apache/crail/rpc/RpcConnection.java | 4 ++ .../java/org/apache/crail/rpc/RpcDispatcher.java | 7 +++ .../main/java/org/apache/crail/rpc/RpcErrors.java | 3 +- .../org/apache/crail/rpc/RpcRemoveDataNode.java | 5 ++ .../rpc/darpc/DaRPCNameNodeConnection.java | 19 +++++++ .../namenode/rpc/darpc/DaRPCNameNodeRequest.java | 17 ++++++ .../namenode/rpc/darpc/DaRPCNameNodeResponse.java | 25 ++++++++- .../crail/namenode/rpc/tcp/TcpNameNodeRequest.java | 21 +++++++- .../namenode/rpc/tcp/TcpNameNodeResponse.java | 22 +++++++- .../crail/namenode/rpc/tcp/TcpRpcConnection.java | 12 +++++ .../java/org/apache/crail/rpc/RpcProtocol.java | 7 ++- .../org/apache/crail/rpc/RpcRequestMessage.java | 60 +++++++++++++++++++++- .../org/apache/crail/rpc/RpcResponseMessage.java | 49 ++++++++++++++++++ 14 files changed, 256 insertions(+), 9 deletions(-) diff --git a/client/src/main/java/org/apache/crail/metadata/DataNodeStatistics.java b/client/src/main/java/org/apache/crail/metadata/DataNodeStatistics.java index fc52f14..00eebd9 100644 --- a/client/src/main/java/org/apache/crail/metadata/DataNodeStatistics.java +++ b/client/src/main/java/org/apache/crail/metadata/DataNodeStatistics.java @@ -22,25 +22,29 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; public class DataNodeStatistics { - public static final int CSIZE = 12; + public static final int CSIZE = 14; private long serviceId; private int freeBlockCount; + private short status; public DataNodeStatistics(){ this.serviceId = 0; this.freeBlockCount = 0; + this.status = 0; } public int write(ByteBuffer buffer){ buffer.putLong(serviceId); buffer.putInt(freeBlockCount); + buffer.putShort(status); return CSIZE; } public void update(ByteBuffer buffer) throws UnknownHostException { this.serviceId = buffer.getLong(); this.freeBlockCount = buffer.getInt(); + this.status = buffer.getShort(); } public int getFreeBlockCount() { @@ -51,6 +55,14 @@ public class DataNodeStatistics { this.freeBlockCount = blockCount; } + public short getStatus() { + return this.status; + } + + public void setStatus(short status) { + this.status = status; + } + public void setStatistics(DataNodeStatistics statistics) { this.serviceId = statistics.getServiceId(); this.freeBlockCount = statistics.getFreeBlockCount(); diff --git a/client/src/main/java/org/apache/crail/rpc/RpcConnection.java b/client/src/main/java/org/apache/crail/rpc/RpcConnection.java index d434fdb..91ae152 100644 --- a/client/src/main/java/org/apache/crail/rpc/RpcConnection.java +++ b/client/src/main/java/org/apache/crail/rpc/RpcConnection.java @@ -19,6 +19,7 @@ package org.apache.crail.rpc; import java.io.IOException; +import java.net.InetAddress; import org.apache.crail.CrailNodeType; import org.apache.crail.metadata.BlockInfo; @@ -59,6 +60,9 @@ public interface RpcConnection { public abstract RpcFuture<RpcPing> pingNameNode() throws Exception; + + public abstract RpcFuture<RpcRemoveDataNode> removeDataNode( + InetAddress ipaddr, int port) throws Exception; public abstract void close() throws Exception; diff --git a/client/src/main/java/org/apache/crail/rpc/RpcDispatcher.java b/client/src/main/java/org/apache/crail/rpc/RpcDispatcher.java index f8d38f5..605193c 100644 --- a/client/src/main/java/org/apache/crail/rpc/RpcDispatcher.java +++ b/client/src/main/java/org/apache/crail/rpc/RpcDispatcher.java @@ -19,6 +19,7 @@ package org.apache.crail.rpc; import java.io.IOException; +import java.net.InetAddress; import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.crail.CrailNodeType; @@ -135,6 +136,12 @@ public class RpcDispatcher implements RpcConnection { } @Override + public RpcFuture<RpcRemoveDataNode> removeDataNode( + InetAddress ipaddr, int port) throws Exception { + return connections[0].removeDataNode(ipaddr, port); + } + + @Override public void close() throws Exception { for (RpcConnection connection : connections){ connection.close(); diff --git a/client/src/main/java/org/apache/crail/rpc/RpcErrors.java b/client/src/main/java/org/apache/crail/rpc/RpcErrors.java index 367e772..199f130 100644 --- a/client/src/main/java/org/apache/crail/rpc/RpcErrors.java +++ b/client/src/main/java/org/apache/crail/rpc/RpcErrors.java @@ -55,7 +55,8 @@ public class RpcErrors { public static short ERR_DIR_LOCATION_AFFINITY_MISMATCH = 26; public static short ERR_ADD_BLOCK_FAILED = 27; public static short ERR_CREATE_FILE_BUG = 28; - + public static short ERR_DATANODE_STOP = 29; + static { messages[ERR_OK] = "ERROR: No error, all fine"; messages[ERR_UNKNOWN] = "ERROR: Unknown error"; diff --git a/client/src/main/java/org/apache/crail/rpc/RpcRemoveDataNode.java b/client/src/main/java/org/apache/crail/rpc/RpcRemoveDataNode.java new file mode 100644 index 0000000..ad3db57 --- /dev/null +++ b/client/src/main/java/org/apache/crail/rpc/RpcRemoveDataNode.java @@ -0,0 +1,5 @@ +package org.apache.crail.rpc; + +public interface RpcRemoveDataNode extends RpcResponse { + public short getData(); +} diff --git a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java index 26409f7..7b53855 100644 --- a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java +++ b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java @@ -19,6 +19,7 @@ package org.apache.crail.namenode.rpc.darpc; import java.io.IOException; +import java.net.InetAddress; import org.apache.crail.CrailNodeType; import org.apache.crail.conf.CrailConstants; @@ -35,6 +36,7 @@ import org.apache.crail.rpc.RpcGetDataNode; import org.apache.crail.rpc.RpcGetFile; import org.apache.crail.rpc.RpcGetLocation; import org.apache.crail.rpc.RpcPing; +import org.apache.crail.rpc.RpcRemoveDataNode; import org.apache.crail.rpc.RpcProtocol; import org.apache.crail.rpc.RpcRenameFile; import org.apache.crail.rpc.RpcRequestMessage; @@ -268,6 +270,23 @@ public class DaRPCNameNodeConnection implements RpcConnection { return nameNodeFuture; } + + @Override + public RpcFuture<RpcRemoveDataNode> removeDataNode(InetAddress ipaddr, int port) throws Exception { + + RpcRequestMessage.RemoveDataNodeReq removeDataNodeReq = new RpcRequestMessage.RemoveDataNodeReq(ipaddr, port); + DaRPCNameNodeRequest request = new DaRPCNameNodeRequest(removeDataNodeReq); + request.setCommand(RpcProtocol.CMD_REMOVE_DATANODE); + + RpcResponseMessage.RemoveDataNodeRes removeDataNodeRes = new RpcResponseMessage.RemoveDataNodeRes(); + DaRPCNameNodeResponse response = new DaRPCNameNodeResponse(removeDataNodeRes); + + DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future = issueRPC(request, response); + + DaRPCNameNodeFuture<RpcRemoveDataNode> nameNodeFuture = new DaRPCNameNodeFuture<RpcRemoveDataNode>(future, removeDataNodeRes); + + return nameNodeFuture; + } @Override public void close() throws Exception { diff --git a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeRequest.java b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeRequest.java index f87ba9d..f496edf 100644 --- a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeRequest.java +++ b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeRequest.java @@ -45,6 +45,7 @@ public class DaRPCNameNodeRequest implements DaRPCMessage { private RpcRequestMessage.GetDataNodeReq getDataNodeReq; private RpcRequestMessage.DumpNameNodeReq dumpNameNodeReq; private RpcRequestMessage.PingNameNodeReq pingNameNodeReq; + private RpcRequestMessage.RemoveDataNodeReq removeDataNodeReq; public DaRPCNameNodeRequest() { this.cmd = 0; @@ -60,6 +61,7 @@ public class DaRPCNameNodeRequest implements DaRPCMessage { this.dumpNameNodeReq = new RpcRequestMessage.DumpNameNodeReq(); this.pingNameNodeReq = new RpcRequestMessage.PingNameNodeReq(); this.getDataNodeReq = new RpcRequestMessage.GetDataNodeReq(); + this.removeDataNodeReq = new RpcRequestMessage.RemoveDataNodeReq(); } public DaRPCNameNodeRequest(RpcRequestMessage.CreateFileReq message) { @@ -115,6 +117,11 @@ public class DaRPCNameNodeRequest implements DaRPCMessage { this.type = message.getType(); this.pingNameNodeReq = message; } + + public DaRPCNameNodeRequest(RpcRequestMessage.RemoveDataNodeReq message) { + this.type = message.getType(); + this.removeDataNodeReq = message; + } public void setCommand(short command) { this.cmd = command; @@ -163,6 +170,9 @@ public class DaRPCNameNodeRequest implements DaRPCMessage { case RpcProtocol.REQ_PING_NAMENODE: written += pingNameNodeReq.write(buffer); break; + case RpcProtocol.REQ_REMOVE_DATANODE: + written += removeDataNodeReq.write(buffer); + break; } return written; @@ -206,6 +216,9 @@ public class DaRPCNameNodeRequest implements DaRPCMessage { case RpcProtocol.REQ_PING_NAMENODE: pingNameNodeReq.update(buffer); break; + case RpcProtocol.REQ_REMOVE_DATANODE: + removeDataNodeReq.update(buffer); + break; } } @@ -260,4 +273,8 @@ public class DaRPCNameNodeRequest implements DaRPCMessage { public RpcRequestMessage.PingNameNodeReq pingNameNode(){ return this.pingNameNodeReq; } + + public RpcRequestMessage.RemoveDataNodeReq removeDataNode(){ + return this.removeDataNodeReq; + } } diff --git a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeResponse.java b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeResponse.java index a4e75f4..bd1d84b 100644 --- a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeResponse.java +++ b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeResponse.java @@ -40,6 +40,7 @@ public class DaRPCNameNodeResponse implements DaRPCMessage, RpcNameNodeState { private RpcResponseMessage.GetLocationRes getLocationRes; private RpcResponseMessage.GetDataNodeRes getDataNodeRes; private RpcResponseMessage.PingNameNodeRes pingNameNodeRes; + private RpcResponseMessage.RemoveDataNodeRes removeDataNodeRes; public DaRPCNameNodeResponse() { this.type = 0; @@ -54,6 +55,7 @@ public class DaRPCNameNodeResponse implements DaRPCMessage, RpcNameNodeState { this.getLocationRes = new RpcResponseMessage.GetLocationRes(); this.getDataNodeRes = new RpcResponseMessage.GetDataNodeRes(); this.pingNameNodeRes = new RpcResponseMessage.PingNameNodeRes(); + this.removeDataNodeRes = new RpcResponseMessage.RemoveDataNodeRes(); } public DaRPCNameNodeResponse(RpcResponseMessage.VoidRes message) { @@ -100,6 +102,11 @@ public class DaRPCNameNodeResponse implements DaRPCMessage, RpcNameNodeState { this.type = message.getType(); this.pingNameNodeRes = message; } + + public DaRPCNameNodeResponse(RpcResponseMessage.RemoveDataNodeRes message) { + this.type = message.getType(); + this.removeDataNodeRes = message; + } public void setType(short type) throws Exception { this.type = type; @@ -149,6 +156,11 @@ public class DaRPCNameNodeResponse implements DaRPCMessage, RpcNameNodeState { throw new Exception("Response type not set"); } break; + case RpcProtocol.RES_REMOVE_DATANODE: + if (removeDataNodeRes == null){ + throw new Exception("Response type not set"); + } + break; } } @@ -188,7 +200,10 @@ public class DaRPCNameNodeResponse implements DaRPCMessage, RpcNameNodeState { break; case RpcProtocol.RES_PING_NAMENODE: written += pingNameNodeRes.write(buffer); - break; + break; + case RpcProtocol.RES_REMOVE_DATANODE: + written += removeDataNodeRes.write(buffer); + break; } return written; @@ -235,6 +250,10 @@ public class DaRPCNameNodeResponse implements DaRPCMessage, RpcNameNodeState { pingNameNodeRes.update(buffer); pingNameNodeRes.setError(error); break; + case RpcProtocol.RES_REMOVE_DATANODE: + removeDataNodeRes.update(buffer); + removeDataNodeRes.setError(error); + break; } } @@ -285,4 +304,8 @@ public class DaRPCNameNodeResponse implements DaRPCMessage, RpcNameNodeState { public RpcResponseMessage.PingNameNodeRes pingNameNode(){ return this.pingNameNodeRes; } + + public RpcResponseMessage.RemoveDataNodeRes removeDataNode(){ + return this.removeDataNodeRes; + } } diff --git a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeRequest.java b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeRequest.java index d22e09a..37e5503 100644 --- a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeRequest.java +++ b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeRequest.java @@ -45,6 +45,7 @@ public class TcpNameNodeRequest extends RpcRequestMessage implements NaRPCMessag private RpcRequestMessage.GetDataNodeReq getDataNodeReq; private RpcRequestMessage.DumpNameNodeReq dumpNameNodeReq; private RpcRequestMessage.PingNameNodeReq pingNameNodeReq; + private RpcRequestMessage.RemoveDataNodeReq removeDataNodeReq; public TcpNameNodeRequest() { this.cmd = 0; @@ -60,7 +61,8 @@ public class TcpNameNodeRequest extends RpcRequestMessage implements NaRPCMessag this.dumpNameNodeReq = new RpcRequestMessage.DumpNameNodeReq(); this.pingNameNodeReq = new RpcRequestMessage.PingNameNodeReq(); this.getDataNodeReq = new RpcRequestMessage.GetDataNodeReq(); - } + this.removeDataNodeReq = new RpcRequestMessage.RemoveDataNodeReq(); + } public TcpNameNodeRequest(RpcRequestMessage.CreateFileReq message) { this.type = message.getType(); @@ -115,7 +117,12 @@ public class TcpNameNodeRequest extends RpcRequestMessage implements NaRPCMessag this.type = message.getType(); this.pingNameNodeReq = message; } - + + public TcpNameNodeRequest(RpcRequestMessage.RemoveDataNodeReq message) { + this.type = message.getType(); + this.removeDataNodeReq = message; + } + public void setCommand(short command) { this.cmd = command; } @@ -163,6 +170,9 @@ public class TcpNameNodeRequest extends RpcRequestMessage implements NaRPCMessag case RpcProtocol.REQ_PING_NAMENODE: written += pingNameNodeReq.write(buffer); break; + case RpcProtocol.REQ_REMOVE_DATANODE: + written += removeDataNodeReq.write(buffer); + break; } return written; @@ -206,6 +216,9 @@ public class TcpNameNodeRequest extends RpcRequestMessage implements NaRPCMessag case RpcProtocol.REQ_PING_NAMENODE: pingNameNodeReq.update(buffer); break; + case RpcProtocol.REQ_REMOVE_DATANODE: + removeDataNodeReq.update(buffer); + break; } } @@ -260,4 +273,8 @@ public class TcpNameNodeRequest extends RpcRequestMessage implements NaRPCMessag public RpcRequestMessage.PingNameNodeReq pingNameNode(){ return this.pingNameNodeReq; } + + public RpcRequestMessage.RemoveDataNodeReq removeDataNode(){ + return this.removeDataNodeReq; + } } diff --git a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeResponse.java b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeResponse.java index 73912df..8a01a19 100644 --- a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeResponse.java +++ b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeResponse.java @@ -43,6 +43,7 @@ public class TcpNameNodeResponse extends RpcResponseMessage implements RpcNameNo private RpcResponseMessage.GetLocationRes getLocationRes; private RpcResponseMessage.GetDataNodeRes getDataNodeRes; private RpcResponseMessage.PingNameNodeRes pingNameNodeRes; + private RpcResponseMessage.RemoveDataNodeRes removeDataNodeRes; public TcpNameNodeResponse() { this.type = 0; @@ -56,6 +57,7 @@ public class TcpNameNodeResponse extends RpcResponseMessage implements RpcNameNo this.getLocationRes = new RpcResponseMessage.GetLocationRes(); this.getDataNodeRes = new RpcResponseMessage.GetDataNodeRes(); this.pingNameNodeRes = new RpcResponseMessage.PingNameNodeRes(); + this.removeDataNodeRes = new RpcResponseMessage.RemoveDataNodeRes(); } public TcpNameNodeResponse(RpcResponseMessage.VoidRes message) { @@ -102,6 +104,11 @@ public class TcpNameNodeResponse extends RpcResponseMessage implements RpcNameNo this.type = message.getType(); this.pingNameNodeRes = message; } + + public TcpNameNodeResponse(RpcResponseMessage.RemoveDataNodeRes message) { + this.type = message.getType(); + this.removeDataNodeRes = message; + } public void setType(short type) throws Exception { this.type = type; @@ -143,7 +150,10 @@ public class TcpNameNodeResponse extends RpcResponseMessage implements RpcNameNo break; case RpcProtocol.RES_PING_NAMENODE: written += pingNameNodeRes.write(buffer); - break; + break; + case RpcProtocol.RES_REMOVE_DATANODE: + written += removeDataNodeRes.write(buffer); + break; } return written; @@ -189,7 +199,11 @@ public class TcpNameNodeResponse extends RpcResponseMessage implements RpcNameNo case RpcProtocol.RES_PING_NAMENODE: pingNameNodeRes.update(buffer); pingNameNodeRes.setError(error); - break; + break; + case RpcProtocol.RES_REMOVE_DATANODE: + removeDataNodeRes.update(buffer); + removeDataNodeRes.setError(error); + break; } } @@ -240,4 +254,8 @@ public class TcpNameNodeResponse extends RpcResponseMessage implements RpcNameNo public RpcResponseMessage.PingNameNodeRes pingNameNode(){ return this.pingNameNodeRes; } + + public RpcResponseMessage.RemoveDataNodeRes removeDataNode(){ + return this.removeDataNodeRes; + } } diff --git a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConnection.java b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConnection.java index decdf1c..6f96e4f 100644 --- a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConnection.java +++ b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConnection.java @@ -31,6 +31,7 @@ import org.apache.crail.utils.CrailUtils; import org.slf4j.Logger; import java.io.IOException; +import java.net.InetAddress; public class TcpRpcConnection implements RpcConnection { static private final Logger LOG = CrailUtils.getLogger(); @@ -184,4 +185,15 @@ public class TcpRpcConnection implements RpcConnection { return new TcpFuture<RpcPing>(future, resp); } + public RpcFuture<RpcRemoveDataNode> removeDataNode(InetAddress addr, int port) throws Exception { + RpcRequestMessage.RemoveDataNodeReq req = new RpcRequestMessage.RemoveDataNodeReq(addr, port); + RpcResponseMessage.RemoveDataNodeRes resp = new RpcResponseMessage.RemoveDataNodeRes(); + + TcpNameNodeRequest request = new TcpNameNodeRequest(req); + TcpNameNodeResponse response = new TcpNameNodeResponse(resp); + request.setCommand(RpcProtocol.CMD_REMOVE_DATANODE); + NaRPCFuture<TcpNameNodeRequest, TcpNameNodeResponse> future = endpoint.issueRequest(request, response); + return new TcpFuture<RpcRemoveDataNode>(future, resp); + } + } \ No newline at end of file diff --git a/rpc/src/main/java/org/apache/crail/rpc/RpcProtocol.java b/rpc/src/main/java/org/apache/crail/rpc/RpcProtocol.java index d5a339d..10c32d3 100644 --- a/rpc/src/main/java/org/apache/crail/rpc/RpcProtocol.java +++ b/rpc/src/main/java/org/apache/crail/rpc/RpcProtocol.java @@ -40,6 +40,7 @@ public class RpcProtocol extends RpcErrors { public static final short CMD_DUMP_NAMENODE = 10; public static final short CMD_PING_NAMENODE = 11; public static final short CMD_GET_DATANODE = 12; + public static final short CMD_REMOVE_DATANODE = 13; //request types public static final short REQ_CREATE_FILE = 1; @@ -53,7 +54,8 @@ public class RpcProtocol extends RpcErrors { public static final short REQ_DUMP_NAMENODE = 10; public static final short REQ_PING_NAMENODE = 11; public static final short REQ_GET_DATANODE = 12; - + public static final short REQ_REMOVE_DATANODE = 13; + //response types public static final short RES_VOID = 1; public static final short RES_CREATE_FILE = 2; @@ -64,6 +66,7 @@ public class RpcProtocol extends RpcErrors { public static final short RES_GET_LOCATION = 7; public static final short RES_PING_NAMENODE = 9; public static final short RES_GET_DATANODE = 10; + public static final short RES_REMOVE_DATANODE = 11; static { @@ -79,6 +82,7 @@ public class RpcProtocol extends RpcErrors { requestTypes[CMD_DUMP_NAMENODE] = REQ_DUMP_NAMENODE; requestTypes[CMD_PING_NAMENODE] = REQ_PING_NAMENODE; requestTypes[CMD_GET_DATANODE] = REQ_GET_DATANODE; + requestTypes[CMD_REMOVE_DATANODE] = REQ_REMOVE_DATANODE; responseTypes[0] = 0; responseTypes[CMD_CREATE_FILE] = RES_CREATE_FILE; @@ -92,6 +96,7 @@ public class RpcProtocol extends RpcErrors { responseTypes[CMD_DUMP_NAMENODE] = RES_VOID; responseTypes[CMD_PING_NAMENODE] = RES_PING_NAMENODE; responseTypes[CMD_GET_DATANODE] = RES_GET_DATANODE; + responseTypes[CMD_REMOVE_DATANODE] = RES_REMOVE_DATANODE; } diff --git a/rpc/src/main/java/org/apache/crail/rpc/RpcRequestMessage.java b/rpc/src/main/java/org/apache/crail/rpc/RpcRequestMessage.java index be043a2..7d1f414 100644 --- a/rpc/src/main/java/org/apache/crail/rpc/RpcRequestMessage.java +++ b/rpc/src/main/java/org/apache/crail/rpc/RpcRequestMessage.java @@ -18,6 +18,8 @@ package org.apache.crail.rpc; +import java.io.IOException; +import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; @@ -571,7 +573,63 @@ public class RpcRequestMessage { public void update(ByteBuffer buffer) { op = buffer.getInt(); } - } + } + + public static class RemoveDataNodeReq implements RpcProtocol.NameNodeRpcMessage { + private InetAddress ipAddr; + private int port; + + public RemoveDataNodeReq(){ + this.ipAddr = null; + this.port = 0; + } + + public RemoveDataNodeReq(InetAddress addr, int port){ + this.ipAddr = addr; + this.port = port; + } + + public int size() { + // sizeof(ip-addr) + sizeof(port) + return 4 + Integer.BYTES; + } + + public short getType(){ + return RpcProtocol.REQ_REMOVE_DATANODE; + } + + public int write(ByteBuffer buffer) throws IOException { + int size = size(); + + checkSize(buffer.remaining()); + + buffer.put(this.getIPAddress().getAddress()); + buffer.putInt(this.port()); + return size; + } + + private void checkSize(int remaining) throws IOException { + if(this.size() > remaining) + throw new IOException("Only " + remaining + " remaining bytes stored in buffer, however " + this.size() + " bytes are required"); + } + + public void update(ByteBuffer buffer) throws IOException { + checkSize(buffer.remaining()); + + byte[] b = new byte[4]; + buffer.get(b); + this.ipAddr = InetAddress.getByAddress(b); + this.port = buffer.getInt(); + } + + public InetAddress getIPAddress(){ + return this.ipAddr; + } + + public int port(){ + return this.port; + } + } } diff --git a/rpc/src/main/java/org/apache/crail/rpc/RpcResponseMessage.java b/rpc/src/main/java/org/apache/crail/rpc/RpcResponseMessage.java index d4175af..32697ea 100644 --- a/rpc/src/main/java/org/apache/crail/rpc/RpcResponseMessage.java +++ b/rpc/src/main/java/org/apache/crail/rpc/RpcResponseMessage.java @@ -598,6 +598,10 @@ public class RpcResponseMessage { public void setFreeBlockCount(int blockCount) { this.statistics.setFreeBlockCount(blockCount); } + + public void setStatus(short status) { + this.statistics.setStatus(status); + } public short getError(){ return 0; @@ -652,4 +656,49 @@ public class RpcResponseMessage { this.error = error; } } + + public static class RemoveDataNodeRes implements RpcProtocol.NameNodeRpcMessage, RpcRemoveDataNode { + public static int CSIZE = Short.BYTES; + + private short data; + private short error; + + public RemoveDataNodeRes() { + this.data = 0; + this.error = 0; + } + + public int size() { + return CSIZE; + } + + public short getType(){ + return RpcProtocol.RES_REMOVE_DATANODE; + } + + public int write(ByteBuffer buffer) { + buffer.putShort(data); + return CSIZE; + } + + public void update(ByteBuffer buffer) { + data = buffer.getShort(); + } + + public short getData(){ + return data; + } + + public void setData(short data) { + this.data = data; + } + + public short getError(){ + return error; + } + + public void setError(short error) { + this.error = error; + } + } }