This is an automated email from the ASF dual-hosted git repository.

east pushed a commit to branch nvmlogging
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 2c0cf11cf26095359a03db94fd4fe35fa63397a6
Author: mdf369 <[email protected]>
AuthorDate: Tue Jan 14 10:35:49 2020 +0800

    add NVMBinaryTVList
---
 .../db/nvm/exception/NVMSpaceManagerException.java |  14 ++
 .../iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java |   8 +-
 .../iotdb/db/nvm/space/NVMBinaryDataSpace.java     |  62 ++++++++
 .../apache/iotdb/db/nvm/space/NVMSpaceManager.java |  52 +++----
 .../db/utils/datastructure/AbstractTVList.java     |   2 +-
 .../iotdb/db/utils/datastructure/BinaryTVList.java |   4 +
 .../db/utils/datastructure/NVMBinaryTVList.java    | 158 +++++++++++++++++++++
 .../iotdb/db/utils/datastructure/NVMTVList.java    |   3 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 9 files changed, 274 insertions(+), 30 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/nvm/exception/NVMSpaceManagerException.java
 
b/server/src/main/java/org/apache/iotdb/db/nvm/exception/NVMSpaceManagerException.java
new file mode 100644
index 0000000..891acc0
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/nvm/exception/NVMSpaceManagerException.java
@@ -0,0 +1,14 @@
+package org.apache.iotdb.db.nvm.exception;
+
+import org.apache.iotdb.db.exception.ProcessException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class NVMSpaceManagerException extends ProcessException {
+
+  private static final long serialVersionUID = 3502239072309147687L;
+
+  public NVMSpaceManagerException(String message) {
+    super(message);
+    errorCode = TSStatusCode.NVMSPACE_MANAGER_EROOR.getStatusCode();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java
 
b/server/src/main/java/org/apache/iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java
index e992781..019bb27 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java
@@ -3,9 +3,9 @@ package org.apache.iotdb.db.nvm.rescon;
 import java.util.ArrayDeque;
 import java.util.EnumMap;
 import org.apache.iotdb.db.nvm.PerfMonitor;
+import org.apache.iotdb.db.nvm.exception.NVMSpaceManagerException;
 import org.apache.iotdb.db.nvm.space.NVMDataSpace;
 import org.apache.iotdb.db.nvm.space.NVMSpaceManager;
-import org.apache.iotdb.db.nvm.space.NVMSpaceMetadataManager;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 public class NVMPrimitiveArrayPool {
@@ -41,7 +41,11 @@ public class NVMPrimitiveArrayPool {
 
     long size = NVMSpaceManager.getPrimitiveTypeByteSize(dataType);
     if (nvmSpace == null) {
-      nvmSpace = NVMSpaceManager.getInstance().allocateDataSpace(size * 
ARRAY_SIZE, dataType, isTime);
+      try {
+        nvmSpace = NVMSpaceManager.getInstance().allocateDataSpace(size * 
ARRAY_SIZE, dataType, isTime);
+      } catch (NVMSpaceManagerException e) {
+        // TODO
+      }
     }
 
     PerfMonitor.add("NVM.getDataList" + (isTime ? "Time" : "Value"), 
System.currentTimeMillis() - time);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMBinaryDataSpace.java 
b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMBinaryDataSpace.java
new file mode 100644
index 0000000..abde477
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMBinaryDataSpace.java
@@ -0,0 +1,62 @@
+package org.apache.iotdb.db.nvm.space;
+
+import static org.apache.iotdb.db.nvm.rescon.NVMPrimitiveArrayPool.ARRAY_SIZE;
+
+import java.nio.ByteBuffer;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+public class NVMBinaryDataSpace extends NVMDataSpace {
+
+  private int cacheSize;
+  private Binary[] cachedBinaries;
+
+  NVMBinaryDataSpace(long offset, long size, ByteBuffer byteBuffer, int index, 
boolean recover) {
+    super(offset, size, byteBuffer, index, TSDataType.TEXT, false);
+
+    cacheSize = 0;
+    cachedBinaries = new Binary[ARRAY_SIZE];
+    if (recover) {
+      recoverCache();
+    }
+  }
+
+  private void recoverCache() {
+    int size = byteBuffer.getInt();
+    cacheSize = size;
+    for (int i = 0; i < size; i++) {
+      int len = byteBuffer.getInt();
+      byte[] bytes = new byte[len];
+      byteBuffer.get(bytes);
+      cachedBinaries[i] = new Binary(bytes);
+    }
+  }
+
+  @Override
+  public int getUnitNum() {
+    return cachedBinaries.length;
+  }
+
+  @Override
+  public Object getData(int index) {
+    return cachedBinaries[index];
+  }
+
+  @Override
+  public void setData(int index, Object object) {
+    // todo nos support index
+    Binary binary = (Binary) object;
+    cachedBinaries[index] = binary;
+    if (index >= cacheSize) {
+      byteBuffer.putInt(0, index);
+      cacheSize = index;
+    }
+    byteBuffer.putInt(binary.getLength());
+    byteBuffer.put(binary.getValues());
+  }
+
+  @Override
+  public Object toArray() {
+    return cachedBinaries;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java 
b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java
index d29a62f..7ae0f4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java
@@ -5,12 +5,12 @@ import static 
org.apache.iotdb.db.nvm.rescon.NVMPrimitiveArrayPool.ARRAY_SIZE;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileChannel.MapMode;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.nvm.exception.NVMSpaceManagerException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -23,6 +23,7 @@ public class NVMSpaceManager {
 
   private static final String NVM_FILE_NAME = "nvmFile";
   public static final int NVMSPACE_NUM_MAX = 1000000;
+  private static final int TEXT_AVERAGE_SIZE_IN_BYTES = 100;
 
   private final static NVMSpaceManager INSTANCE = new NVMSpaceManager();
 
@@ -65,26 +66,33 @@ public class NVMSpaceManager {
     return nvmSpace;
   }
 
-  public synchronized NVMDataSpace allocateDataSpace(long size, TSDataType 
dataType, boolean isTime) {
-    checkIsFull();
+  public synchronized NVMDataSpace allocateDataSpace(long size, TSDataType 
dataType, boolean isTime)
+      throws NVMSpaceManagerException {
+    checkIsNVMFull(size);
 
     try {
       logger.trace("Try to allocate NVMDataSpace from {} to {}", curOffset, 
curOffset + size);
       int index = curDataSpaceIndex.getAndIncrement();
-      NVMDataSpace nvmSpace = new NVMDataSpace(
-          curOffset, size, nvmFileChannel.map(MAP_MODE, curOffset, size), 
index, dataType, isTime);
+      NVMDataSpace nvmSpace;
+      if (dataType == TSDataType.TEXT) {
+        nvmSpace = new NVMBinaryDataSpace(curOffset, size, 
nvmFileChannel.map(MAP_MODE, curOffset, size),
+            index, false);
+      } else {
+        nvmSpace = new NVMDataSpace(curOffset, size, 
nvmFileChannel.map(MAP_MODE, curOffset, size),
+            index, dataType, isTime);
+      }
       curOffset += size;
       return nvmSpace;
     } catch (IOException e) {
-      // TODO deal with error
       logger.error("Fail to allocate {} nvm space at {}.", size, curOffset);
-      e.printStackTrace();
-      return null;
+      throw new NVMSpaceManagerException(e.getMessage());
     }
   }
 
-  private void checkIsFull() {
-    // TODO
+  private void checkIsNVMFull(long sizeToAllocate) throws 
NVMSpaceManagerException {
+    if (curOffset + sizeToAllocate > nvmSize) {
+      throw new NVMSpaceManagerException("NVM space is used up, can't allocate 
more. (total: " + nvmSize + ", used: " + curOffset + ", to allocate: " + 
sizeToAllocate + ")");
+    }
   }
 
   public NVMDataSpace getNVMDataSpaceByIndex(int spaceIndex) throws 
IOException {
@@ -100,7 +108,14 @@ public class NVMSpaceManager {
 
   private synchronized NVMDataSpace recoverData(long offset, long size, int 
index, TSDataType dataType) throws IOException {
     logger.trace("Try to recover NVMSpace from {} to {}", offset, offset + 
size);
-    NVMDataSpace nvmSpace = new NVMDataSpace(offset, size, 
nvmFileChannel.map(MAP_MODE, offset, size), index, dataType, false);
+    NVMDataSpace nvmSpace;
+    if (dataType == TSDataType.TEXT) {
+      nvmSpace = new NVMBinaryDataSpace(offset, size, 
nvmFileChannel.map(MAP_MODE, offset, size),
+          index, true);
+    } else {
+     nvmSpace = new NVMDataSpace(offset, size, nvmFileChannel.map(MAP_MODE, 
offset, size), index, dataType,
+          false);
+    }
     return nvmSpace;
   }
 
@@ -127,24 +142,11 @@ public class NVMSpaceManager {
         size = Double.BYTES;
         break;
       case TEXT:
-        // TODO
+        size = TEXT_AVERAGE_SIZE_IN_BYTES;
         break;
       default:
         throw new UnSupportedDataTypeException("DataType: " + dataType);
     }
     return size;
   }
-
-  public static void main(String[] args) throws IOException {
-    String nvmDir = IoTDBDescriptor.getInstance().getConfig().getNvmDir();
-    String nvmFilePath = nvmDir + File.separatorChar + NVM_FILE_NAME;
-    File nvmDirFile = FSFactoryProducer.getFSFactory().getFile(nvmDir);
-    nvmDirFile.mkdirs();
-    FileChannel nvmFileChannel = new RandomAccessFile(nvmFilePath, 
"rw").getChannel();
-
-    ByteBuffer byteBuffer = nvmFileChannel.map(MapMode.READ_WRITE, 0, 4);
-    for (int i = 0; i < 1; i++) {
-      System.out.println(byteBuffer.getInt(i));
-    }
-  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AbstractTVList.java
 
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AbstractTVList.java
index f040090..9f6a8c9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AbstractTVList.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AbstractTVList.java
@@ -183,7 +183,7 @@ public abstract class AbstractTVList {
       }
       reverseRange(lo, runHi);
     } else {                              // Ascending
-      while (runHi < hi &&getTime(runHi) >= getTime(runHi - 1)) {
+      while (runHi < hi && getTime(runHi) >= getTime(runHi - 1)) {
         runHi++;
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
 
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
index be799bf..821d5c2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
@@ -210,4 +210,8 @@ public class BinaryTVList extends TVList {
       }
     }
   }
+
+  void addBatchValue(Binary[] batch) {
+    values.add(batch);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMBinaryTVList.java
 
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMBinaryTVList.java
new file mode 100644
index 0000000..75cb3b0
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMBinaryTVList.java
@@ -0,0 +1,158 @@
+package org.apache.iotdb.db.utils.datastructure;
+
+import static org.apache.iotdb.db.nvm.rescon.NVMPrimitiveArrayPool.ARRAY_SIZE;
+
+import org.apache.iotdb.db.nvm.space.NVMDataSpace;
+import org.apache.iotdb.db.rescon.PrimitiveArrayPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+public class NVMBinaryTVList extends NVMTVList {
+
+  // TODO
+  private Binary[][] sortedValues;
+
+  private Binary pivotValue;
+
+  NVMBinaryTVList(String sgId, String deviceId, String measurementId) {
+    super(sgId, deviceId, measurementId);
+    dataType = TSDataType.TEXT;
+  }
+
+  @Override
+  public void putBinary(long timestamp, Binary value) {
+    checkExpansion();
+    int arrayIndex = size / ARRAY_SIZE;
+    int elementIndex = size % ARRAY_SIZE;
+    minTime = minTime <= timestamp ? minTime : timestamp;
+    timestamps.get(arrayIndex).setData(elementIndex, timestamp);
+    values.get(arrayIndex).setData(elementIndex, value);
+    size++;
+    if (sorted && size > 1 && timestamp < getTime(size - 2)) {
+      sorted = false;
+    }
+  }
+
+  @Override
+  public Binary getBinary(int index) {
+    if (index >= size) {
+      throw new ArrayIndexOutOfBoundsException(index);
+    }
+    int arrayIndex = index / ARRAY_SIZE;
+    int elementIndex = index % ARRAY_SIZE;
+    return (Binary) values.get(arrayIndex).getData(elementIndex);
+  }
+
+  @Override
+  public BinaryTVList clone() {
+    BinaryTVList cloneList = new BinaryTVList();
+    cloneAs(cloneList);
+    for (NVMDataSpace valueSpace : values) {
+      cloneList.addBatchValue((Binary[]) cloneValue(valueSpace));
+    }
+    return cloneList;
+  }
+
+  @Override
+  public void sort() {
+    if (sortedTimestamps == null || sortedTimestamps.length < size) {
+      sortedTimestamps = (long[][]) PrimitiveArrayPool
+          .getInstance().getDataListsByType(TSDataType.INT64, size);
+    }
+    if (sortedValues == null || sortedValues.length < size) {
+      sortedValues = (Binary[][]) PrimitiveArrayPool
+          .getInstance().getDataListsByType(dataType, size);
+    }
+    sort(0, size);
+    clearSortedValue();
+    clearSortedTime();
+    sorted = true;
+  }
+
+  @Override
+  protected void clearSortedValue() {
+    if (sortedValues != null) {
+      for (Binary[] dataArray : sortedValues) {
+        PrimitiveArrayPool.getInstance().release(dataArray);
+      }
+      sortedValues = null;
+    }
+  }
+
+  @Override
+  protected void setFromSorted(int src, int dest) {
+    set(dest, sortedTimestamps[src/ARRAY_SIZE][src%ARRAY_SIZE], 
sortedValues[src/ARRAY_SIZE][src%ARRAY_SIZE]);
+  }
+
+  @Override
+  protected void set(int src, int dest) {
+    long srcT = getTime(src);
+    Binary srcV = getBinary(src);
+    set(dest, srcT, srcV);
+  }
+
+  @Override
+  protected void setToSorted(int src, int dest) {
+    sortedTimestamps[dest/ARRAY_SIZE][dest% ARRAY_SIZE] = getTime(src);
+    sortedValues[dest/ARRAY_SIZE][dest%ARRAY_SIZE] = getBinary(src);
+  }
+
+  @Override
+  protected void reverseRange(int lo, int hi) {
+    hi--;
+    while (lo < hi) {
+      long loT = getTime(lo);
+      Binary loV = getBinary(lo);
+      long hiT = getTime(hi);
+      Binary hiV = getBinary(hi);
+      set(lo++, hiT, hiV);
+      set(hi--, loT, loV);
+    }
+  }
+
+  @Override
+  protected void saveAsPivot(int pos) {
+    pivotTime = getTime(pos);
+    pivotValue = getBinary(pos);
+  }
+
+  @Override
+  protected void setPivotTo(int pos) {
+    set(pos, pivotTime, pivotValue);
+  }
+
+  @Override
+  public void putBinaries(long[] time, Binary[] value) {
+    checkExpansion();
+    int idx = 0;
+    int length = time.length;
+
+    for (int i = 0; i < length; i++) {
+      putBinary(time[i], value[i]);
+    }
+
+//    updateMinTimeAndSorted(time);
+//
+//    while (idx < length) {
+//      int inputRemaining = length - idx;
+//      int arrayIdx = size / ARRAY_SIZE;
+//      int elementIdx = size % ARRAY_SIZE;
+//      int internalRemaining  = ARRAY_SIZE - elementIdx;
+//      if (internalRemaining >= inputRemaining) {
+//        // the remaining inputs can fit the last array, copy all remaining 
inputs into last array
+//        System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, 
inputRemaining);
+//        System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, 
inputRemaining);
+//        size += inputRemaining;
+//        break;
+//      } else {
+//        // the remaining inputs cannot fit the last array, fill the last 
array and create a new
+//        // one and enter the next loop
+//        System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, 
internalRemaining);
+//        System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, 
internalRemaining);
+//        idx += internalRemaining;
+//        size += internalRemaining;
+//        checkExpansion();
+//      }
+//    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMTVList.java 
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMTVList.java
index a71bd29..4570d66 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMTVList.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMTVList.java
@@ -170,8 +170,7 @@ public abstract class NVMTVList extends AbstractTVList {
   public static NVMTVList newList(String sgId, String deviceId, String 
measurementId, TSDataType dataType) {
     switch (dataType) {
       case TEXT:
-        // TODO
-//        return new BinaryTVList();
+        return new NVMBinaryTVList(sgId, deviceId, measurementId);
       case FLOAT:
         return new NVMFloatTVList(sgId, deviceId, measurementId);
       case INT32:
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 3ec590d..ce3df8d 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -36,6 +36,7 @@ public enum TSStatusCode {
   STORAGE_GROUP_PROCESSOR_ERROR(311),
   STORAGE_GROUP_ERROR(312),
   STORAGE_ENGINE_ERROR(313),
+  NVMSPACE_MANAGER_EROOR(314),
   EXECUTE_STATEMENT_ERROR(400),
   SQL_PARSE_ERROR(401),
   GENERATE_TIME_ZONE_ERROR(402),

Reply via email to