Github user animeshtrivedi commented on a diff in the pull request:

    https://github.com/apache/incubator-crail/pull/16#discussion_r180317810
  
    --- Diff: 
storage-nvmf/src/main/java/org/apache/crail/storage/nvmf/client/NvmfStorageEndpoint.java
 ---
    @@ -19,208 +19,229 @@
     
     package org.apache.crail.storage.nvmf.client;
     
    -import com.ibm.disni.nvmef.NvmeCommand;
    -import com.ibm.disni.nvmef.NvmeEndpoint;
    -import com.ibm.disni.nvmef.NvmeEndpointGroup;
    -import com.ibm.disni.nvmef.spdk.IOCompletion;
    -
    +import com.ibm.jnvmf.*;
     import org.apache.crail.CrailBuffer;
    +import org.apache.crail.CrailBufferCache;
    +import org.apache.crail.CrailStatistics;
     import org.apache.crail.conf.CrailConstants;
    -import org.apache.crail.memory.BufferCache;
     import org.apache.crail.metadata.BlockInfo;
    +import org.apache.crail.metadata.DataNodeInfo;
     import org.apache.crail.storage.StorageEndpoint;
     import org.apache.crail.storage.StorageFuture;
    -import org.apache.crail.storage.nvmf.NvmfBufferCache;
     import org.apache.crail.storage.nvmf.NvmfStorageConstants;
     import org.apache.crail.utils.CrailUtils;
     import org.slf4j.Logger;
     
     import java.io.IOException;
    +import java.net.InetAddress;
     import java.net.InetSocketAddress;
    -import java.net.URI;
    -import java.net.URISyntaxException;
    -import java.util.concurrent.*;
    +import java.util.List;
    +import java.util.Queue;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.TimeoutException;
    +import java.util.concurrent.atomic.AtomicInteger;
     
     public class NvmfStorageEndpoint implements StorageEndpoint {
        private static final Logger LOG = CrailUtils.getLogger();
     
    -   private final InetSocketAddress inetSocketAddress;
    -   private final NvmeEndpoint endpoint;
    -   private final int sectorSize;
    -   private final BufferCache cache;
    -   private final BlockingQueue<NvmeCommand> freeCommands;
    -   private final NvmeCommand[] commands;
    -   private final NvmfStorageFuture[] futures;
    -   private final ThreadLocal<long[]> completed;
    -   private final int ioQeueueSize;
    -
    -   public NvmfStorageEndpoint(NvmeEndpointGroup group, InetSocketAddress 
inetSocketAddress) throws IOException {
    -           this.inetSocketAddress = inetSocketAddress;
    -           endpoint = group.createEndpoint();
    +   private final Controller controller;
    +   private final IoQueuePair queuePair;
    +   private final int lbaDataSize;
    +   private final long namespaceCapacity;
    +   private final NvmfRegisteredBufferCache registeredBufferCache;
    +   private final NvmfStagingBufferCache stagingBufferCache;
    +   private final CrailStatistics statistics;
    +
    +   private final Queue<NvmWriteCommand> writeCommands;
    +   private final Queue<NvmReadCommand> readCommands;
    +
    +   private final AtomicInteger outstandingOperations;
    +
    +   public NvmfStorageEndpoint(Nvme nvme, DataNodeInfo info, 
CrailStatistics statistics,
    +                                                      CrailBufferCache 
bufferCache) throws IOException {
    +           InetSocketAddress inetSocketAddress = new InetSocketAddress(
    +                           InetAddress.getByAddress(info.getIpAddress()), 
info.getPort());
    +           // XXX FIXME: nsid from datanodeinfo
    +           NvmfTransportId transportId = new 
NvmfTransportId(inetSocketAddress,
    +                           new 
NvmeQualifiedName(NvmfStorageConstants.NQN.toString() + info.getPort()));
    +           LOG.info("Connecting to NVMf target at " + 
transportId.toString());
    +           controller = nvme.connect(transportId);
    +           controller.getControllerConfiguration().setEnable(true);
    +           controller.syncConfiguration();
                try {
    -                   URI url = new URI("nvmef://" + 
inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() +
    -                                   "/0/" + NvmfStorageConstants.NAMESPACE 
+ "?subsystem=nqn.2016-06.io.spdk:cnode1");
    -                   LOG.info("Connecting to " + url.toString());
    -                   endpoint.connect(url);
    -           } catch (URISyntaxException e) {
    -                   //FIXME
    -                   e.printStackTrace();
    +                   controller.waitUntilReady();
    +           } catch (TimeoutException e) {
    +                   throw new IOException(e);
                }
    -           sectorSize = endpoint.getSectorSize();
    -           cache = new NvmfBufferCache();
    -           ioQeueueSize = endpoint.getIOQueueSize();
    -           freeCommands = new 
ArrayBlockingQueue<NvmeCommand>(ioQeueueSize);
    -           commands = new NvmeCommand[ioQeueueSize];
    -           for (int i = 0; i < ioQeueueSize; i++) {
    -                   NvmeCommand command = endpoint.newCommand();
    -                   command.setId(i);
    -                   commands[i] = command;
    -                   freeCommands.add(command);
    +           IdentifyControllerData identifyControllerData = 
controller.getIdentifyControllerData();
    +           if (CrailConstants.SLICE_SIZE > 
identifyControllerData.getMaximumDataTransferSize().toInt()) {
    +                   throw new 
IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY + " > max transfer size 
(" +
    +                                   
identifyControllerData.getMaximumDataTransferSize() + ")");
                }
    -           futures = new NvmfStorageFuture[ioQeueueSize];
    -           completed = new ThreadLocal<long[]>() {
    -                   public long[] initialValue() {
    -                           return new long[ioQeueueSize];
    +           List<Namespace> namespaces = controller.getActiveNamespaces();
    +           //TODO: poll nsid in datanodeinfo
    +           NamespaceIdentifier namespaceIdentifier = new 
NamespaceIdentifier(1);
    +           Namespace namespace = null;
    +           for (Namespace n : namespaces) {
    +                   if (n.getIdentifier().equals(namespaceIdentifier)) {
    +                           namespace = n;
    +                           break;
                        }
    -           };
    +           }
    +           if (namespace == null) {
    +                   throw new IllegalArgumentException("No namespace with 
id " + namespaceIdentifier +
    +                                   " at controller " + 
transportId.toString());
    +           }
    +           IdentifyNamespaceData identifyNamespaceData = 
namespace.getIdentifyNamespaceData();
    +           lbaDataSize = 
identifyNamespaceData.getFormattedLbaSize().getLbaDataSize().toInt();
    +           if (CrailConstants.SLICE_SIZE % lbaDataSize != 0) {
    +                   throw new 
IllegalArgumentException(CrailConstants.SLICE_SIZE_KEY +
    +                                   " is not a multiple of LBA data size (" 
+ lbaDataSize + ")");
    +           }
    +           namespaceCapacity = 
identifyNamespaceData.getNamespaceCapacity() * lbaDataSize;
    +           this.queuePair = 
controller.createIoQueuePair(NvmfStorageConstants.QUEUE_SIZE, 0, 0,
    +                           SubmissionQueueEntry.SIZE);
    +
    +           this.writeCommands = new 
ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE);
    +           this.readCommands = new 
ArrayBlockingQueue<>(NvmfStorageConstants.QUEUE_SIZE);
    +           for(int i = 0; i < NvmfStorageConstants.QUEUE_SIZE; i++) {
    +                   NvmWriteCommand writeCommand = new 
NvmWriteCommand(queuePair);
    +                   writeCommand.setSendInline(true);
    +                   
writeCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier);
    +                   writeCommands.add(writeCommand);
    +                   NvmReadCommand readCommand = new 
NvmReadCommand(queuePair);
    +                   readCommand.setSendInline(true);
    +                   
readCommand.getCommandCapsule().getSubmissionQueueEntry().setNamespaceIdentifier(namespaceIdentifier);
    +                   readCommands.add(readCommand);
    +           }
    +           this.registeredBufferCache = new 
NvmfRegisteredBufferCache(queuePair);
    +           this.outstandingOperations = new AtomicInteger(0);
    +           this.stagingBufferCache = new 
NvmfStagingBufferCache(bufferCache,
    +                           NvmfStorageConstants.STAGING_CACHE_SIZE, 
getLBADataSize());
    +           this.statistics = statistics;
    +   }
    +
    +   public void keepAlive() throws IOException {
    +           controller.keepAlive();
    +   }
    +
    +   public int getLBADataSize() {
    +           return lbaDataSize;
        }
     
    -   public int getSectorSize() {
    -           return sectorSize;
    +   public long getNamespaceCapacity() {
    +           return namespaceCapacity;
        }
     
        enum Operation {
                WRITE,
    -           READ;
    +           READ
        }
     
    -   public StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo 
remoteMr, long remoteOffset)
    -                   throws IOException, InterruptedException {
    -           int length = buffer.remaining();
    -           if (length > CrailConstants.BLOCK_SIZE){
    -                   throw new IOException("write size too large " + length);
    -           }
    -           if (length <= 0){
    -                   throw new IOException("write size too small, len " + 
length);
    -           }
    -           if (buffer.position() < 0){
    -                   throw new IOException("local offset too small " + 
buffer.position());
    -           }
    -           if (remoteOffset < 0){
    -                   throw new IOException("remote offset too small " + 
remoteOffset);
    -           }
    +   void putOperation() {
    +           outstandingOperations.decrementAndGet();
    +   }
     
    -           if (remoteMr.getAddr() + remoteOffset + length > 
endpoint.getNamespaceSize()){
    -                   long tmpAddr = remoteMr.getAddr() + remoteOffset + 
length;
    -                   throw new IOException("remote fileOffset + remoteOffset 
+ len = " + tmpAddr + " - size = " +
    -                                   endpoint.getNamespaceSize());
    +   private boolean tryGetOperation() {
    +           int outstandingOperationsOld = outstandingOperations.get();
    +           if (outstandingOperationsOld < NvmfStorageConstants.QUEUE_SIZE) 
{
    +                   return 
outstandingOperations.compareAndSet(outstandingOperationsOld, 
outstandingOperationsOld + 1);
                }
    +           return false;
    +   }
     
    -//         LOG.info("op = " + op.name() +
    -//                         ", position = " + buffer.position() +
    -//                         ", localOffset = " + buffer.position() +
    -//                         ", remoteOffset = " + remoteOffset +
    -//                         ", remoteAddr = " + remoteMr.getAddr() +
    -//                         ", length = " + length);
    -
    -           NvmeCommand command = freeCommands.poll();
    -           while (command == null) {
    -                   poll();
    -                   command = freeCommands.poll();
    -           }
    +   private static int divCeil(int a, int b) {
    +           return (a + b - 1) / b;
    +   }
     
    -           boolean aligned = 
NvmfStorageUtils.namespaceSectorOffset(sectorSize, remoteOffset) == 0
    -                           && 
NvmfStorageUtils.namespaceSectorOffset(sectorSize, length) == 0;
    -           long lba = NvmfStorageUtils.linearBlockAddress(remoteMr, 
remoteOffset, sectorSize);
    -           StorageFuture future = null;
    -           if (aligned) {
    -//                 LOG.info("aligned");
    -                   
command.setBuffer(buffer.getByteBuffer()).setLinearBlockAddress(lba);
    -                   switch(op) {
    -                           case READ:
    -                                   command.read();
    -                                   break;
    -                           case WRITE:
    -                                   command.write();
    -                                   break;
    -                   }
    -                   future = futures[(int)command.getId()] = new 
NvmfStorageFuture(this, length);
    -                   command.execute();
    -           } else {
    -//                 LOG.info("unaligned");
    -                   long alignedLength = 
NvmfStorageUtils.alignLength(sectorSize, remoteOffset, length);
    +   private int getNumLogicalBlocks(CrailBuffer buffer) {
    +           return divCeil(buffer.remaining(), getLBADataSize());
    +   }
     
    -                   CrailBuffer stagingBuffer = cache.allocateBuffer();
    -                   stagingBuffer.limit((int)alignedLength);
    +   StorageFuture Op(Operation op, CrailBuffer buffer, BlockInfo blockInfo, 
long remoteOffset) throws InterruptedException, IOException {
    +           assert blockInfo.getAddr() + remoteOffset + buffer.remaining() 
<= getNamespaceCapacity();
    --- End diff --
    
    FYI: Java asserts are disabled by default 
(https://docs.oracle.com/javase/8/docs/technotes/guides/language/assert.html#enable-disable)
 These checks will not catch anything unless the JVM is running with `-ea` 
flag. 


---

Reply via email to