NvmfStorageClient: keep alive thread

Start keep alive thread which sends keep alive message to
each controller every 110s (timeout 120s). Otherwise thread sleeps.

Signed-off-by: Jonas Pfefferle <peppe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/ac9e2990
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/ac9e2990
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/ac9e2990

Branch: refs/heads/master
Commit: ac9e2990703ab0d235548851b327c985f453cbcc
Parents: 3845c27
Author: Jonas Pfefferle <peppe...@apache.org>
Authored: Mon Apr 9 15:16:49 2018 +0200
Committer: Jonas Pfefferle <peppe...@apache.org>
Committed: Tue Apr 10 13:19:02 2018 +0200

----------------------------------------------------------------------
 .../crail/storage/nvmf/NvmfStorageClient.java   | 43 ++++++++++++++++++--
 .../nvmf/client/NvmfStorageEndpoint.java        |  4 ++
 2 files changed, 43 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/ac9e2990/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
----------------------------------------------------------------------
diff --git 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
index d9dd976..cccf596 100644
--- 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
+++ 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/NvmfStorageClient.java
@@ -34,18 +34,44 @@ import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
 
 public class NvmfStorageClient implements StorageClient {
        private static final Logger LOG = CrailUtils.getLogger();
        private static Nvme nvme;
        private boolean initialized;
-       private List<StorageEndpoint> endpoints;
+       private volatile boolean closing;
+       private final Thread keepAliveThread;
+       private List<NvmfStorageEndpoint> endpoints;
        private CrailStatistics statistics;
        private CrailBufferCache bufferCache;
 
        public NvmfStorageClient() {
                this.initialized = false;
                this.endpoints = new CopyOnWriteArrayList<>();
+               this.closing = false;
+               this.keepAliveThread = new Thread(() -> {
+                       while (!closing) {
+                               for (NvmfStorageEndpoint endpoint : endpoints) {
+                                       try {
+                                               endpoint.keepAlive();
+                                       } catch (IOException e) {
+                                               e.printStackTrace();
+                                               return;
+                                       }
+                               }
+                               /* We use the default keep alive timer of 120s 
in jNVMf */
+                               try {
+                                       
Thread.sleep(TimeUnit.MILLISECONDS.convert(110, TimeUnit.SECONDS));
+                               } catch (InterruptedException e) {
+                                       return;
+                               }
+                       }
+               });
+       }
+
+       boolean isValid() {
+               return keepAliveThread.isAlive();
        }
 
        public void init(CrailStatistics statistics, CrailBufferCache 
bufferCache, CrailConfiguration crailConfiguration,
@@ -58,6 +84,7 @@ public class NvmfStorageClient implements StorageClient {
                this.bufferCache = bufferCache;
                LOG.info("Initialize Nvmf storage client");
                NvmfStorageConstants.parseCmdLine(crailConfiguration, args);
+               keepAliveThread.start();
        }
 
        public void printConf(Logger logger) {
@@ -72,14 +99,22 @@ public class NvmfStorageClient implements StorageClient {
        }
 
        public synchronized StorageEndpoint createEndpoint(DataNodeInfo info) 
throws IOException {
-               StorageEndpoint endpoint = new 
NvmfStorageEndpoint(getEndpointGroup(), info, statistics, bufferCache);
+               if (!isValid()) {
+                       throw new IOException("Storage client state not valid");
+               }
+               NvmfStorageEndpoint endpoint = new 
NvmfStorageEndpoint(getEndpointGroup(), info, statistics, bufferCache);
                endpoints.add(endpoint);
                return endpoint;
        }
 
        public void close() throws Exception {
-               for (StorageEndpoint endpoint : endpoints) {
-                       endpoint.close();
+               if (!closing) {
+                       closing = true;
+                       keepAliveThread.interrupt();
+                       keepAliveThread.join();
+                       for (StorageEndpoint endpoint : endpoints) {
+                               endpoint.close();
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/ac9e2990/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
----------------------------------------------------------------------
diff --git 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
index 1661349..c9b17de 100644
--- 
a/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
+++ 
b/storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
@@ -121,6 +121,10 @@ public class NvmfStorageEndpoint implements 
StorageEndpoint {
                this.statistics = statistics;
        }
 
+       public void keepAlive() throws IOException {
+               controller.keepAlive();
+       }
+
        public int getLBADataSize() {
                return lbaDataSize;
        }

Reply via email to