This is an automated email from the ASF dual-hosted git repository.

pepperjo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-crail.git

commit 7c4557df39a955dce9dde15762af95fee0eb6f7f
Author: Malte Brodmann <mbrodm...@student.ethz.ch>
AuthorDate: Fri Feb 5 15:37:24 2021 +0100

    implement RPC to remove datanodes
---
 .../apache/crail/metadata/DataNodeStatistics.java  | 14 ++++-
 .../java/org/apache/crail/rpc/RpcConnection.java   |  4 ++
 .../java/org/apache/crail/rpc/RpcDispatcher.java   |  7 +++
 .../main/java/org/apache/crail/rpc/RpcErrors.java  |  3 +-
 .../org/apache/crail/rpc/RpcRemoveDataNode.java    |  5 ++
 .../rpc/darpc/DaRPCNameNodeConnection.java         | 19 +++++++
 .../namenode/rpc/darpc/DaRPCNameNodeRequest.java   | 17 ++++++
 .../namenode/rpc/darpc/DaRPCNameNodeResponse.java  | 25 ++++++++-
 .../crail/namenode/rpc/tcp/TcpNameNodeRequest.java | 21 +++++++-
 .../namenode/rpc/tcp/TcpNameNodeResponse.java      | 22 +++++++-
 .../crail/namenode/rpc/tcp/TcpRpcConnection.java   | 12 +++++
 .../java/org/apache/crail/rpc/RpcProtocol.java     |  7 ++-
 .../org/apache/crail/rpc/RpcRequestMessage.java    | 60 +++++++++++++++++++++-
 .../org/apache/crail/rpc/RpcResponseMessage.java   | 49 ++++++++++++++++++
 14 files changed, 256 insertions(+), 9 deletions(-)

diff --git 
a/client/src/main/java/org/apache/crail/metadata/DataNodeStatistics.java 
b/client/src/main/java/org/apache/crail/metadata/DataNodeStatistics.java
index fc52f14..00eebd9 100644
--- a/client/src/main/java/org/apache/crail/metadata/DataNodeStatistics.java
+++ b/client/src/main/java/org/apache/crail/metadata/DataNodeStatistics.java
@@ -22,25 +22,29 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 
 public class DataNodeStatistics {
-       public static final int CSIZE = 12;
+       public static final int CSIZE = 14;
        
        private long serviceId;
        private int freeBlockCount;
+       private short status;
        
        public DataNodeStatistics(){
                this.serviceId = 0;
                this.freeBlockCount = 0;
+               this.status = 0;
        }
        
        public int write(ByteBuffer buffer){
                buffer.putLong(serviceId);
                buffer.putInt(freeBlockCount);
+               buffer.putShort(status);
                return CSIZE;
        }
        
        public void update(ByteBuffer buffer) throws UnknownHostException {
                this.serviceId = buffer.getLong();
                this.freeBlockCount = buffer.getInt();
+               this.status = buffer.getShort();
        }
 
        public int getFreeBlockCount() {
@@ -51,6 +55,14 @@ public class DataNodeStatistics {
                this.freeBlockCount = blockCount;
        }
 
+       public short getStatus() {
+               return this.status;
+       }
+
+       public void setStatus(short status) {
+               this.status = status;
+       }
+
        public void setStatistics(DataNodeStatistics statistics) {
                this.serviceId = statistics.getServiceId();
                this.freeBlockCount = statistics.getFreeBlockCount();
diff --git a/client/src/main/java/org/apache/crail/rpc/RpcConnection.java 
b/client/src/main/java/org/apache/crail/rpc/RpcConnection.java
index d434fdb..91ae152 100644
--- a/client/src/main/java/org/apache/crail/rpc/RpcConnection.java
+++ b/client/src/main/java/org/apache/crail/rpc/RpcConnection.java
@@ -19,6 +19,7 @@
 package org.apache.crail.rpc;
 
 import java.io.IOException;
+import java.net.InetAddress;
 
 import org.apache.crail.CrailNodeType;
 import org.apache.crail.metadata.BlockInfo;
@@ -59,6 +60,9 @@ public interface RpcConnection {
 
        public abstract RpcFuture<RpcPing> pingNameNode()
                        throws Exception;
+
+       public abstract RpcFuture<RpcRemoveDataNode> removeDataNode(
+                       InetAddress ipaddr, int port) throws Exception;
        
        public abstract void close() throws Exception;
        
diff --git a/client/src/main/java/org/apache/crail/rpc/RpcDispatcher.java 
b/client/src/main/java/org/apache/crail/rpc/RpcDispatcher.java
index f8d38f5..605193c 100644
--- a/client/src/main/java/org/apache/crail/rpc/RpcDispatcher.java
+++ b/client/src/main/java/org/apache/crail/rpc/RpcDispatcher.java
@@ -19,6 +19,7 @@
 package org.apache.crail.rpc;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.crail.CrailNodeType;
@@ -135,6 +136,12 @@ public class RpcDispatcher implements RpcConnection {
        }
 
        @Override
+       public RpcFuture<RpcRemoveDataNode> removeDataNode(
+                       InetAddress ipaddr, int port) throws Exception {
+               return connections[0].removeDataNode(ipaddr, port);
+       }
+
+       @Override
        public void close() throws Exception {
                for (RpcConnection connection : connections){
                        connection.close();
diff --git a/client/src/main/java/org/apache/crail/rpc/RpcErrors.java 
b/client/src/main/java/org/apache/crail/rpc/RpcErrors.java
index 367e772..199f130 100644
--- a/client/src/main/java/org/apache/crail/rpc/RpcErrors.java
+++ b/client/src/main/java/org/apache/crail/rpc/RpcErrors.java
@@ -55,7 +55,8 @@ public class RpcErrors {
        public static short ERR_DIR_LOCATION_AFFINITY_MISMATCH = 26;
        public static short ERR_ADD_BLOCK_FAILED = 27;
        public static short ERR_CREATE_FILE_BUG = 28;
-       
+       public static short ERR_DATANODE_STOP = 29;
+
        static {
                messages[ERR_OK] = "ERROR: No error, all fine";
                messages[ERR_UNKNOWN] = "ERROR: Unknown error";
diff --git a/client/src/main/java/org/apache/crail/rpc/RpcRemoveDataNode.java 
b/client/src/main/java/org/apache/crail/rpc/RpcRemoveDataNode.java
new file mode 100644
index 0000000..ad3db57
--- /dev/null
+++ b/client/src/main/java/org/apache/crail/rpc/RpcRemoveDataNode.java
@@ -0,0 +1,5 @@
+package org.apache.crail.rpc;
+
+public interface RpcRemoveDataNode extends RpcResponse {
+    public short getData();
+}
diff --git 
a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java
 
b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java
index 26409f7..7b53855 100644
--- 
a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java
+++ 
b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeConnection.java
@@ -19,6 +19,7 @@
 package org.apache.crail.namenode.rpc.darpc;
 
 import java.io.IOException;
+import java.net.InetAddress;
 
 import org.apache.crail.CrailNodeType;
 import org.apache.crail.conf.CrailConstants;
@@ -35,6 +36,7 @@ import org.apache.crail.rpc.RpcGetDataNode;
 import org.apache.crail.rpc.RpcGetFile;
 import org.apache.crail.rpc.RpcGetLocation;
 import org.apache.crail.rpc.RpcPing;
+import org.apache.crail.rpc.RpcRemoveDataNode;
 import org.apache.crail.rpc.RpcProtocol;
 import org.apache.crail.rpc.RpcRenameFile;
 import org.apache.crail.rpc.RpcRequestMessage;
@@ -268,6 +270,23 @@ public class DaRPCNameNodeConnection implements 
RpcConnection {
                
                return nameNodeFuture;  
        }
+
+       @Override
+       public RpcFuture<RpcRemoveDataNode> removeDataNode(InetAddress ipaddr, 
int port) throws Exception {
+
+               RpcRequestMessage.RemoveDataNodeReq removeDataNodeReq = new 
RpcRequestMessage.RemoveDataNodeReq(ipaddr, port);
+               DaRPCNameNodeRequest request = new 
DaRPCNameNodeRequest(removeDataNodeReq);
+               request.setCommand(RpcProtocol.CMD_REMOVE_DATANODE);
+
+               RpcResponseMessage.RemoveDataNodeRes removeDataNodeRes = new 
RpcResponseMessage.RemoveDataNodeRes();
+               DaRPCNameNodeResponse response = new 
DaRPCNameNodeResponse(removeDataNodeRes);
+               
+               DaRPCFuture<DaRPCNameNodeRequest, DaRPCNameNodeResponse> future 
= issueRPC(request, response);
+               
+               DaRPCNameNodeFuture<RpcRemoveDataNode> nameNodeFuture = new 
DaRPCNameNodeFuture<RpcRemoveDataNode>(future, removeDataNodeRes);
+               
+               return nameNodeFuture;
+       }
        
        @Override
        public void close() throws Exception {
diff --git 
a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeRequest.java
 
b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeRequest.java
index f87ba9d..f496edf 100644
--- 
a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeRequest.java
+++ 
b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeRequest.java
@@ -45,6 +45,7 @@ public class DaRPCNameNodeRequest implements DaRPCMessage {
        private RpcRequestMessage.GetDataNodeReq getDataNodeReq;
        private RpcRequestMessage.DumpNameNodeReq dumpNameNodeReq;
        private RpcRequestMessage.PingNameNodeReq pingNameNodeReq;
+       private RpcRequestMessage.RemoveDataNodeReq removeDataNodeReq;
 
        public DaRPCNameNodeRequest() {
                this.cmd = 0;
@@ -60,6 +61,7 @@ public class DaRPCNameNodeRequest implements DaRPCMessage {
                this.dumpNameNodeReq = new RpcRequestMessage.DumpNameNodeReq();
                this.pingNameNodeReq = new RpcRequestMessage.PingNameNodeReq();
                this.getDataNodeReq = new RpcRequestMessage.GetDataNodeReq();
+               this.removeDataNodeReq = new 
RpcRequestMessage.RemoveDataNodeReq();
        }
        
        public DaRPCNameNodeRequest(RpcRequestMessage.CreateFileReq message) {
@@ -115,6 +117,11 @@ public class DaRPCNameNodeRequest implements DaRPCMessage {
                this.type = message.getType();
                this.pingNameNodeReq = message;
        }
+
+       public DaRPCNameNodeRequest(RpcRequestMessage.RemoveDataNodeReq 
message) {
+               this.type = message.getType();
+               this.removeDataNodeReq = message;
+       }
        
        public void setCommand(short command) {
                this.cmd = command;
@@ -163,6 +170,9 @@ public class DaRPCNameNodeRequest implements DaRPCMessage {
                case RpcProtocol.REQ_PING_NAMENODE:
                        written += pingNameNodeReq.write(buffer);
                        break;
+               case RpcProtocol.REQ_REMOVE_DATANODE:
+                       written += removeDataNodeReq.write(buffer);
+                       break;
                }
                
                return written;
@@ -206,6 +216,9 @@ public class DaRPCNameNodeRequest implements DaRPCMessage {
                case RpcProtocol.REQ_PING_NAMENODE:
                        pingNameNodeReq.update(buffer);
                        break;
+               case RpcProtocol.REQ_REMOVE_DATANODE:
+                       removeDataNodeReq.update(buffer);
+                       break;
                }
        }
 
@@ -260,4 +273,8 @@ public class DaRPCNameNodeRequest implements DaRPCMessage {
        public RpcRequestMessage.PingNameNodeReq pingNameNode(){
                return this.pingNameNodeReq;
        }
+
+       public RpcRequestMessage.RemoveDataNodeReq removeDataNode(){
+               return this.removeDataNodeReq;
+       }
 }
diff --git 
a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeResponse.java
 
b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeResponse.java
index a4e75f4..bd1d84b 100644
--- 
a/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeResponse.java
+++ 
b/rpc-darpc/src/main/java/org/apache/crail/namenode/rpc/darpc/DaRPCNameNodeResponse.java
@@ -40,6 +40,7 @@ public class DaRPCNameNodeResponse implements DaRPCMessage, 
RpcNameNodeState {
        private RpcResponseMessage.GetLocationRes getLocationRes;       
        private RpcResponseMessage.GetDataNodeRes getDataNodeRes;
        private RpcResponseMessage.PingNameNodeRes pingNameNodeRes;
+       private RpcResponseMessage.RemoveDataNodeRes removeDataNodeRes;
        
        public DaRPCNameNodeResponse() {
                this.type = 0;
@@ -54,6 +55,7 @@ public class DaRPCNameNodeResponse implements DaRPCMessage, 
RpcNameNodeState {
                this.getLocationRes = new RpcResponseMessage.GetLocationRes();
                this.getDataNodeRes = new RpcResponseMessage.GetDataNodeRes();
                this.pingNameNodeRes = new RpcResponseMessage.PingNameNodeRes();
+               this.removeDataNodeRes = new 
RpcResponseMessage.RemoveDataNodeRes();
        }
        
        public DaRPCNameNodeResponse(RpcResponseMessage.VoidRes message) {
@@ -100,6 +102,11 @@ public class DaRPCNameNodeResponse implements 
DaRPCMessage, RpcNameNodeState {
                this.type = message.getType();
                this.pingNameNodeRes = message;
        }
+
+       public DaRPCNameNodeResponse(RpcResponseMessage.RemoveDataNodeRes 
message) {
+               this.type = message.getType();
+               this.removeDataNodeRes = message;
+       }
        
        public void setType(short type) throws Exception {
                this.type = type;
@@ -149,6 +156,11 @@ public class DaRPCNameNodeResponse implements 
DaRPCMessage, RpcNameNodeState {
                                throw new Exception("Response type not set");
                        }
                        break;
+               case RpcProtocol.RES_REMOVE_DATANODE:
+                       if (removeDataNodeRes == null){
+                               throw new Exception("Response type not set");
+                       }
+                       break;
                }               
        }       
 
@@ -188,7 +200,10 @@ public class DaRPCNameNodeResponse implements 
DaRPCMessage, RpcNameNodeState {
                        break;                  
                case RpcProtocol.RES_PING_NAMENODE:
                        written += pingNameNodeRes.write(buffer);
-                       break;                  
+                       break;          
+               case RpcProtocol.RES_REMOVE_DATANODE:
+                       written += removeDataNodeRes.write(buffer);
+                       break;          
                }
                
                return written;
@@ -235,6 +250,10 @@ public class DaRPCNameNodeResponse implements 
DaRPCMessage, RpcNameNodeState {
                        pingNameNodeRes.update(buffer);
                        pingNameNodeRes.setError(error);
                        break;          
+               case RpcProtocol.RES_REMOVE_DATANODE:
+                       removeDataNodeRes.update(buffer);
+                       removeDataNodeRes.setError(error);
+                       break;          
                }
        }
        
@@ -285,4 +304,8 @@ public class DaRPCNameNodeResponse implements DaRPCMessage, 
RpcNameNodeState {
        public RpcResponseMessage.PingNameNodeRes pingNameNode(){
                return this.pingNameNodeRes;
        }
+
+       public RpcResponseMessage.RemoveDataNodeRes removeDataNode(){
+               return this.removeDataNodeRes;
+       }
 }
diff --git 
a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeRequest.java
 
b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeRequest.java
index d22e09a..37e5503 100644
--- 
a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeRequest.java
+++ 
b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeRequest.java
@@ -45,6 +45,7 @@ public class TcpNameNodeRequest extends RpcRequestMessage 
implements NaRPCMessag
        private RpcRequestMessage.GetDataNodeReq getDataNodeReq;
        private RpcRequestMessage.DumpNameNodeReq dumpNameNodeReq;
        private RpcRequestMessage.PingNameNodeReq pingNameNodeReq;
+       private RpcRequestMessage.RemoveDataNodeReq removeDataNodeReq;
 
        public TcpNameNodeRequest() {
                this.cmd = 0;
@@ -60,7 +61,8 @@ public class TcpNameNodeRequest extends RpcRequestMessage 
implements NaRPCMessag
                this.dumpNameNodeReq = new RpcRequestMessage.DumpNameNodeReq();
                this.pingNameNodeReq = new RpcRequestMessage.PingNameNodeReq();
                this.getDataNodeReq = new RpcRequestMessage.GetDataNodeReq();
-       }       
+               this.removeDataNodeReq = new 
RpcRequestMessage.RemoveDataNodeReq();
+       }
        
        public TcpNameNodeRequest(RpcRequestMessage.CreateFileReq message) {
                this.type = message.getType();
@@ -115,7 +117,12 @@ public class TcpNameNodeRequest extends RpcRequestMessage 
implements NaRPCMessag
                this.type = message.getType();
                this.pingNameNodeReq = message;
        }
-       
+
+       public TcpNameNodeRequest(RpcRequestMessage.RemoveDataNodeReq message) {
+               this.type = message.getType();
+               this.removeDataNodeReq = message;
+       }
+
        public void setCommand(short command) {
                this.cmd = command;
        }       
@@ -163,6 +170,9 @@ public class TcpNameNodeRequest extends RpcRequestMessage 
implements NaRPCMessag
                case RpcProtocol.REQ_PING_NAMENODE:
                        written += pingNameNodeReq.write(buffer);
                        break;
+               case RpcProtocol.REQ_REMOVE_DATANODE:
+                       written += removeDataNodeReq.write(buffer);
+                       break;
                }
                
                return written;
@@ -206,6 +216,9 @@ public class TcpNameNodeRequest extends RpcRequestMessage 
implements NaRPCMessag
                case RpcProtocol.REQ_PING_NAMENODE:
                        pingNameNodeReq.update(buffer);
                        break;
+               case RpcProtocol.REQ_REMOVE_DATANODE:
+                       removeDataNodeReq.update(buffer);
+                       break;
                }
        }
 
@@ -260,4 +273,8 @@ public class TcpNameNodeRequest extends RpcRequestMessage 
implements NaRPCMessag
        public RpcRequestMessage.PingNameNodeReq pingNameNode(){
                return this.pingNameNodeReq;
        }
+
+       public RpcRequestMessage.RemoveDataNodeReq removeDataNode(){
+               return this.removeDataNodeReq;
+       }
 }
diff --git 
a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeResponse.java
 
b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeResponse.java
index 73912df..8a01a19 100644
--- 
a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeResponse.java
+++ 
b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpNameNodeResponse.java
@@ -43,6 +43,7 @@ public class TcpNameNodeResponse extends RpcResponseMessage 
implements RpcNameNo
        private RpcResponseMessage.GetLocationRes getLocationRes;       
        private RpcResponseMessage.GetDataNodeRes getDataNodeRes;
        private RpcResponseMessage.PingNameNodeRes pingNameNodeRes;
+       private RpcResponseMessage.RemoveDataNodeRes removeDataNodeRes;
        
        public TcpNameNodeResponse() {
                this.type = 0;
@@ -56,6 +57,7 @@ public class TcpNameNodeResponse extends RpcResponseMessage 
implements RpcNameNo
                this.getLocationRes = new RpcResponseMessage.GetLocationRes();
                this.getDataNodeRes = new RpcResponseMessage.GetDataNodeRes();
                this.pingNameNodeRes = new RpcResponseMessage.PingNameNodeRes();
+               this.removeDataNodeRes = new 
RpcResponseMessage.RemoveDataNodeRes();
        }
        
        public TcpNameNodeResponse(RpcResponseMessage.VoidRes message) {
@@ -102,6 +104,11 @@ public class TcpNameNodeResponse extends 
RpcResponseMessage implements RpcNameNo
                this.type = message.getType();
                this.pingNameNodeRes = message;
        }
+
+       public TcpNameNodeResponse(RpcResponseMessage.RemoveDataNodeRes 
message) {
+               this.type = message.getType();
+               this.removeDataNodeRes = message;
+       }
        
        public void setType(short type) throws Exception {
                this.type = type;
@@ -143,7 +150,10 @@ public class TcpNameNodeResponse extends 
RpcResponseMessage implements RpcNameNo
                        break;                  
                case RpcProtocol.RES_PING_NAMENODE:
                        written += pingNameNodeRes.write(buffer);
-                       break;                  
+                       break;
+               case RpcProtocol.RES_REMOVE_DATANODE:
+                       written += removeDataNodeRes.write(buffer);
+                       break;
                }
                
                return written;
@@ -189,7 +199,11 @@ public class TcpNameNodeResponse extends 
RpcResponseMessage implements RpcNameNo
                case RpcProtocol.RES_PING_NAMENODE:
                        pingNameNodeRes.update(buffer);
                        pingNameNodeRes.setError(error);
-                       break;          
+                       break;
+               case RpcProtocol.RES_REMOVE_DATANODE:
+                       removeDataNodeRes.update(buffer);
+                       removeDataNodeRes.setError(error);
+                       break;
                }
        }
        
@@ -240,4 +254,8 @@ public class TcpNameNodeResponse extends RpcResponseMessage 
implements RpcNameNo
        public RpcResponseMessage.PingNameNodeRes pingNameNode(){
                return this.pingNameNodeRes;
        }
+
+       public RpcResponseMessage.RemoveDataNodeRes removeDataNode(){
+               return this.removeDataNodeRes;
+       }
 }
diff --git 
a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConnection.java
 
b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConnection.java
index decdf1c..6f96e4f 100644
--- 
a/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConnection.java
+++ 
b/rpc-narpc/src/main/java/org/apache/crail/namenode/rpc/tcp/TcpRpcConnection.java
@@ -31,6 +31,7 @@ import org.apache.crail.utils.CrailUtils;
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.net.InetAddress;
 
 public class TcpRpcConnection implements RpcConnection {
        static private final Logger LOG = CrailUtils.getLogger();
@@ -184,4 +185,15 @@ public class TcpRpcConnection implements RpcConnection {
                return new TcpFuture<RpcPing>(future, resp);
        }
 
+       public RpcFuture<RpcRemoveDataNode> removeDataNode(InetAddress addr, 
int port) throws Exception {
+               RpcRequestMessage.RemoveDataNodeReq req = new 
RpcRequestMessage.RemoveDataNodeReq(addr, port);
+               RpcResponseMessage.RemoveDataNodeRes resp = new 
RpcResponseMessage.RemoveDataNodeRes();
+
+               TcpNameNodeRequest request = new TcpNameNodeRequest(req);
+               TcpNameNodeResponse response = new TcpNameNodeResponse(resp);
+               request.setCommand(RpcProtocol.CMD_REMOVE_DATANODE);
+               NaRPCFuture<TcpNameNodeRequest, TcpNameNodeResponse> future = 
endpoint.issueRequest(request, response);
+               return new TcpFuture<RpcRemoveDataNode>(future, resp);
+       }
+
 }
\ No newline at end of file
diff --git a/rpc/src/main/java/org/apache/crail/rpc/RpcProtocol.java 
b/rpc/src/main/java/org/apache/crail/rpc/RpcProtocol.java
index d5a339d..10c32d3 100644
--- a/rpc/src/main/java/org/apache/crail/rpc/RpcProtocol.java
+++ b/rpc/src/main/java/org/apache/crail/rpc/RpcProtocol.java
@@ -40,6 +40,7 @@ public class RpcProtocol extends RpcErrors {
        public static final short CMD_DUMP_NAMENODE = 10;
        public static final short CMD_PING_NAMENODE = 11;
        public static final short CMD_GET_DATANODE = 12;
+       public static final short CMD_REMOVE_DATANODE = 13;
        
        //request types
        public static final short REQ_CREATE_FILE = 1;  
@@ -53,7 +54,8 @@ public class RpcProtocol extends RpcErrors {
        public static final short REQ_DUMP_NAMENODE = 10;
        public static final short REQ_PING_NAMENODE = 11;
        public static final short REQ_GET_DATANODE = 12;
-       
+       public static final short REQ_REMOVE_DATANODE = 13;
+
        //response types
        public static final short RES_VOID = 1;
        public static final short RES_CREATE_FILE = 2;
@@ -64,6 +66,7 @@ public class RpcProtocol extends RpcErrors {
        public static final short RES_GET_LOCATION = 7;
        public static final short RES_PING_NAMENODE = 9;
        public static final short RES_GET_DATANODE = 10;
+       public static final short RES_REMOVE_DATANODE = 11;
        
        
        static {
@@ -79,6 +82,7 @@ public class RpcProtocol extends RpcErrors {
                requestTypes[CMD_DUMP_NAMENODE] = REQ_DUMP_NAMENODE;
                requestTypes[CMD_PING_NAMENODE] = REQ_PING_NAMENODE;    
                requestTypes[CMD_GET_DATANODE] = REQ_GET_DATANODE;
+               requestTypes[CMD_REMOVE_DATANODE] = REQ_REMOVE_DATANODE;
                
                responseTypes[0] = 0;
                responseTypes[CMD_CREATE_FILE] = RES_CREATE_FILE;
@@ -92,6 +96,7 @@ public class RpcProtocol extends RpcErrors {
                responseTypes[CMD_DUMP_NAMENODE] = RES_VOID;
                responseTypes[CMD_PING_NAMENODE] = RES_PING_NAMENODE;   
                responseTypes[CMD_GET_DATANODE] = RES_GET_DATANODE;
+               responseTypes[CMD_REMOVE_DATANODE] = RES_REMOVE_DATANODE;
        }
        
 
diff --git a/rpc/src/main/java/org/apache/crail/rpc/RpcRequestMessage.java 
b/rpc/src/main/java/org/apache/crail/rpc/RpcRequestMessage.java
index be043a2..7d1f414 100644
--- a/rpc/src/main/java/org/apache/crail/rpc/RpcRequestMessage.java
+++ b/rpc/src/main/java/org/apache/crail/rpc/RpcRequestMessage.java
@@ -18,6 +18,8 @@
 
 package org.apache.crail.rpc;
 
+import java.io.IOException;
+import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 
@@ -571,7 +573,63 @@ public class RpcRequestMessage {
                public void update(ByteBuffer buffer) {
                        op = buffer.getInt();
                }               
-       }       
+       }
+
+       public static class RemoveDataNodeReq implements 
RpcProtocol.NameNodeRpcMessage {
+               private InetAddress ipAddr;
+               private int port;
+
+               public RemoveDataNodeReq(){
+                       this.ipAddr = null;
+                       this.port = 0;
+               }
+
+               public RemoveDataNodeReq(InetAddress addr, int port){
+                       this.ipAddr = addr;
+                       this.port = port;
+               }
+
+               public int size() {
+                       //  sizeof(ip-addr) + sizeof(port)
+                       return 4 + Integer.BYTES;
+               }
+
+               public short getType(){
+                       return RpcProtocol.REQ_REMOVE_DATANODE;
+               }
+
+               public int write(ByteBuffer buffer) throws IOException {
+                       int size = size();
+
+                       checkSize(buffer.remaining());
+
+                       buffer.put(this.getIPAddress().getAddress());
+                       buffer.putInt(this.port());
+                       return size;
+               }
+
+               private void checkSize(int remaining) throws IOException {
+                       if(this.size() > remaining)
+                               throw new IOException("Only " + remaining + " 
remaining bytes stored in buffer, however " + this.size() + " bytes are 
required");
+               }
+
+               public void update(ByteBuffer buffer) throws IOException {
+                       checkSize(buffer.remaining());
+
+                       byte[] b = new byte[4];
+                       buffer.get(b);
+                       this.ipAddr = InetAddress.getByAddress(b);
+                       this.port = buffer.getInt();
+               }
+
+               public InetAddress getIPAddress(){
+                       return this.ipAddr;
+               }
+
+               public int port(){
+                       return this.port;
+               }
+       }
        
 
 }
diff --git a/rpc/src/main/java/org/apache/crail/rpc/RpcResponseMessage.java 
b/rpc/src/main/java/org/apache/crail/rpc/RpcResponseMessage.java
index d4175af..32697ea 100644
--- a/rpc/src/main/java/org/apache/crail/rpc/RpcResponseMessage.java
+++ b/rpc/src/main/java/org/apache/crail/rpc/RpcResponseMessage.java
@@ -598,6 +598,10 @@ public class RpcResponseMessage {
                public void setFreeBlockCount(int blockCount) {
                        this.statistics.setFreeBlockCount(blockCount);
                }
+
+               public void setStatus(short status) {
+                       this.statistics.setStatus(status);
+               }
                
                public short getError(){
                        return 0;
@@ -652,4 +656,49 @@ public class RpcResponseMessage {
                        this.error = error;
                }
        }
+
+       public static class RemoveDataNodeRes implements 
RpcProtocol.NameNodeRpcMessage, RpcRemoveDataNode {
+               public static int CSIZE = Short.BYTES;
+
+               private short data;
+               private short error;
+
+               public RemoveDataNodeRes() {
+                       this.data = 0;
+                       this.error = 0;
+               }
+
+               public int size() {
+                       return CSIZE;
+               }
+
+               public short getType(){
+                       return RpcProtocol.RES_REMOVE_DATANODE;
+               }
+
+               public int write(ByteBuffer buffer) {
+                       buffer.putShort(data);
+                       return CSIZE;
+               }
+
+               public void update(ByteBuffer buffer) {
+                       data = buffer.getShort();
+               }
+
+               public short getData(){
+                       return data;
+               }
+
+               public void setData(short data) {
+                       this.data = data;
+               }
+
+               public short getError(){
+                       return error;
+               }
+
+               public void setError(short error) {
+                       this.error = error;
+               }
+       }
 }

Reply via email to