This is an automated email from the ASF dual-hosted git repository. east pushed a commit to branch nvmlogging in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 2c0cf11cf26095359a03db94fd4fe35fa63397a6 Author: mdf369 <[email protected]> AuthorDate: Tue Jan 14 10:35:49 2020 +0800 add NVMBinaryTVList --- .../db/nvm/exception/NVMSpaceManagerException.java | 14 ++ .../iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java | 8 +- .../iotdb/db/nvm/space/NVMBinaryDataSpace.java | 62 ++++++++ .../apache/iotdb/db/nvm/space/NVMSpaceManager.java | 52 +++---- .../db/utils/datastructure/AbstractTVList.java | 2 +- .../iotdb/db/utils/datastructure/BinaryTVList.java | 4 + .../db/utils/datastructure/NVMBinaryTVList.java | 158 +++++++++++++++++++++ .../iotdb/db/utils/datastructure/NVMTVList.java | 3 +- .../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 + 9 files changed, 274 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/exception/NVMSpaceManagerException.java b/server/src/main/java/org/apache/iotdb/db/nvm/exception/NVMSpaceManagerException.java new file mode 100644 index 0000000..891acc0 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/nvm/exception/NVMSpaceManagerException.java @@ -0,0 +1,14 @@ +package org.apache.iotdb.db.nvm.exception; + +import org.apache.iotdb.db.exception.ProcessException; +import org.apache.iotdb.rpc.TSStatusCode; + +public class NVMSpaceManagerException extends ProcessException { + + private static final long serialVersionUID = 3502239072309147687L; + + public NVMSpaceManagerException(String message) { + super(message); + errorCode = TSStatusCode.NVMSPACE_MANAGER_EROOR.getStatusCode(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java b/server/src/main/java/org/apache/iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java index e992781..019bb27 100644 --- a/server/src/main/java/org/apache/iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java +++ b/server/src/main/java/org/apache/iotdb/db/nvm/rescon/NVMPrimitiveArrayPool.java @@ -3,9 +3,9 @@ package org.apache.iotdb.db.nvm.rescon; import java.util.ArrayDeque; import java.util.EnumMap; import org.apache.iotdb.db.nvm.PerfMonitor; +import org.apache.iotdb.db.nvm.exception.NVMSpaceManagerException; import org.apache.iotdb.db.nvm.space.NVMDataSpace; import org.apache.iotdb.db.nvm.space.NVMSpaceManager; -import org.apache.iotdb.db.nvm.space.NVMSpaceMetadataManager; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; public class NVMPrimitiveArrayPool { @@ -41,7 +41,11 @@ public class NVMPrimitiveArrayPool { long size = NVMSpaceManager.getPrimitiveTypeByteSize(dataType); if (nvmSpace == null) { - nvmSpace = NVMSpaceManager.getInstance().allocateDataSpace(size * ARRAY_SIZE, dataType, isTime); + try { + nvmSpace = NVMSpaceManager.getInstance().allocateDataSpace(size * ARRAY_SIZE, dataType, isTime); + } catch (NVMSpaceManagerException e) { + // TODO + } } PerfMonitor.add("NVM.getDataList" + (isTime ? "Time" : "Value"), System.currentTimeMillis() - time); diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMBinaryDataSpace.java b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMBinaryDataSpace.java new file mode 100644 index 0000000..abde477 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMBinaryDataSpace.java @@ -0,0 +1,62 @@ +package org.apache.iotdb.db.nvm.space; + +import static org.apache.iotdb.db.nvm.rescon.NVMPrimitiveArrayPool.ARRAY_SIZE; + +import java.nio.ByteBuffer; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.utils.Binary; + +public class NVMBinaryDataSpace extends NVMDataSpace { + + private int cacheSize; + private Binary[] cachedBinaries; + + NVMBinaryDataSpace(long offset, long size, ByteBuffer byteBuffer, int index, boolean recover) { + super(offset, size, byteBuffer, index, TSDataType.TEXT, false); + + cacheSize = 0; + cachedBinaries = new Binary[ARRAY_SIZE]; + if (recover) { + recoverCache(); + } + } + + private void recoverCache() { + int size = byteBuffer.getInt(); + cacheSize = size; + for (int i = 0; i < size; i++) { + int len = byteBuffer.getInt(); + byte[] bytes = new byte[len]; + byteBuffer.get(bytes); + cachedBinaries[i] = new Binary(bytes); + } + } + + @Override + public int getUnitNum() { + return cachedBinaries.length; + } + + @Override + public Object getData(int index) { + return cachedBinaries[index]; + } + + @Override + public void setData(int index, Object object) { + // todo nos support index + Binary binary = (Binary) object; + cachedBinaries[index] = binary; + if (index >= cacheSize) { + byteBuffer.putInt(0, index); + cacheSize = index; + } + byteBuffer.putInt(binary.getLength()); + byteBuffer.put(binary.getValues()); + } + + @Override + public Object toArray() { + return cachedBinaries; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java index d29a62f..7ae0f4e 100644 --- a/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/nvm/space/NVMSpaceManager.java @@ -5,12 +5,12 @@ import static org.apache.iotdb.db.nvm.rescon.NVMPrimitiveArrayPool.ARRAY_SIZE; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; -import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; import java.util.concurrent.atomic.AtomicInteger; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StartupException; +import org.apache.iotdb.db.nvm.exception.NVMSpaceManagerException; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; @@ -23,6 +23,7 @@ public class NVMSpaceManager { private static final String NVM_FILE_NAME = "nvmFile"; public static final int NVMSPACE_NUM_MAX = 1000000; + private static final int TEXT_AVERAGE_SIZE_IN_BYTES = 100; private final static NVMSpaceManager INSTANCE = new NVMSpaceManager(); @@ -65,26 +66,33 @@ public class NVMSpaceManager { return nvmSpace; } - public synchronized NVMDataSpace allocateDataSpace(long size, TSDataType dataType, boolean isTime) { - checkIsFull(); + public synchronized NVMDataSpace allocateDataSpace(long size, TSDataType dataType, boolean isTime) + throws NVMSpaceManagerException { + checkIsNVMFull(size); try { logger.trace("Try to allocate NVMDataSpace from {} to {}", curOffset, curOffset + size); int index = curDataSpaceIndex.getAndIncrement(); - NVMDataSpace nvmSpace = new NVMDataSpace( - curOffset, size, nvmFileChannel.map(MAP_MODE, curOffset, size), index, dataType, isTime); + NVMDataSpace nvmSpace; + if (dataType == TSDataType.TEXT) { + nvmSpace = new NVMBinaryDataSpace(curOffset, size, nvmFileChannel.map(MAP_MODE, curOffset, size), + index, false); + } else { + nvmSpace = new NVMDataSpace(curOffset, size, nvmFileChannel.map(MAP_MODE, curOffset, size), + index, dataType, isTime); + } curOffset += size; return nvmSpace; } catch (IOException e) { - // TODO deal with error logger.error("Fail to allocate {} nvm space at {}.", size, curOffset); - e.printStackTrace(); - return null; + throw new NVMSpaceManagerException(e.getMessage()); } } - private void checkIsFull() { - // TODO + private void checkIsNVMFull(long sizeToAllocate) throws NVMSpaceManagerException { + if (curOffset + sizeToAllocate > nvmSize) { + throw new NVMSpaceManagerException("NVM space is used up, can't allocate more. (total: " + nvmSize + ", used: " + curOffset + ", to allocate: " + sizeToAllocate + ")"); + } } public NVMDataSpace getNVMDataSpaceByIndex(int spaceIndex) throws IOException { @@ -100,7 +108,14 @@ public class NVMSpaceManager { private synchronized NVMDataSpace recoverData(long offset, long size, int index, TSDataType dataType) throws IOException { logger.trace("Try to recover NVMSpace from {} to {}", offset, offset + size); - NVMDataSpace nvmSpace = new NVMDataSpace(offset, size, nvmFileChannel.map(MAP_MODE, offset, size), index, dataType, false); + NVMDataSpace nvmSpace; + if (dataType == TSDataType.TEXT) { + nvmSpace = new NVMBinaryDataSpace(offset, size, nvmFileChannel.map(MAP_MODE, offset, size), + index, true); + } else { + nvmSpace = new NVMDataSpace(offset, size, nvmFileChannel.map(MAP_MODE, offset, size), index, dataType, + false); + } return nvmSpace; } @@ -127,24 +142,11 @@ public class NVMSpaceManager { size = Double.BYTES; break; case TEXT: - // TODO + size = TEXT_AVERAGE_SIZE_IN_BYTES; break; default: throw new UnSupportedDataTypeException("DataType: " + dataType); } return size; } - - public static void main(String[] args) throws IOException { - String nvmDir = IoTDBDescriptor.getInstance().getConfig().getNvmDir(); - String nvmFilePath = nvmDir + File.separatorChar + NVM_FILE_NAME; - File nvmDirFile = FSFactoryProducer.getFSFactory().getFile(nvmDir); - nvmDirFile.mkdirs(); - FileChannel nvmFileChannel = new RandomAccessFile(nvmFilePath, "rw").getChannel(); - - ByteBuffer byteBuffer = nvmFileChannel.map(MapMode.READ_WRITE, 0, 4); - for (int i = 0; i < 1; i++) { - System.out.println(byteBuffer.getInt(i)); - } - } } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AbstractTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AbstractTVList.java index f040090..9f6a8c9 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AbstractTVList.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AbstractTVList.java @@ -183,7 +183,7 @@ public abstract class AbstractTVList { } reverseRange(lo, runHi); } else { // Ascending - while (runHi < hi &&getTime(runHi) >= getTime(runHi - 1)) { + while (runHi < hi && getTime(runHi) >= getTime(runHi - 1)) { runHi++; } } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java index be799bf..821d5c2 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java @@ -210,4 +210,8 @@ public class BinaryTVList extends TVList { } } } + + void addBatchValue(Binary[] batch) { + values.add(batch); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMBinaryTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMBinaryTVList.java new file mode 100644 index 0000000..75cb3b0 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMBinaryTVList.java @@ -0,0 +1,158 @@ +package org.apache.iotdb.db.utils.datastructure; + +import static org.apache.iotdb.db.nvm.rescon.NVMPrimitiveArrayPool.ARRAY_SIZE; + +import org.apache.iotdb.db.nvm.space.NVMDataSpace; +import org.apache.iotdb.db.rescon.PrimitiveArrayPool; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.utils.Binary; + +public class NVMBinaryTVList extends NVMTVList { + + // TODO + private Binary[][] sortedValues; + + private Binary pivotValue; + + NVMBinaryTVList(String sgId, String deviceId, String measurementId) { + super(sgId, deviceId, measurementId); + dataType = TSDataType.TEXT; + } + + @Override + public void putBinary(long timestamp, Binary value) { + checkExpansion(); + int arrayIndex = size / ARRAY_SIZE; + int elementIndex = size % ARRAY_SIZE; + minTime = minTime <= timestamp ? minTime : timestamp; + timestamps.get(arrayIndex).setData(elementIndex, timestamp); + values.get(arrayIndex).setData(elementIndex, value); + size++; + if (sorted && size > 1 && timestamp < getTime(size - 2)) { + sorted = false; + } + } + + @Override + public Binary getBinary(int index) { + if (index >= size) { + throw new ArrayIndexOutOfBoundsException(index); + } + int arrayIndex = index / ARRAY_SIZE; + int elementIndex = index % ARRAY_SIZE; + return (Binary) values.get(arrayIndex).getData(elementIndex); + } + + @Override + public BinaryTVList clone() { + BinaryTVList cloneList = new BinaryTVList(); + cloneAs(cloneList); + for (NVMDataSpace valueSpace : values) { + cloneList.addBatchValue((Binary[]) cloneValue(valueSpace)); + } + return cloneList; + } + + @Override + public void sort() { + if (sortedTimestamps == null || sortedTimestamps.length < size) { + sortedTimestamps = (long[][]) PrimitiveArrayPool + .getInstance().getDataListsByType(TSDataType.INT64, size); + } + if (sortedValues == null || sortedValues.length < size) { + sortedValues = (Binary[][]) PrimitiveArrayPool + .getInstance().getDataListsByType(dataType, size); + } + sort(0, size); + clearSortedValue(); + clearSortedTime(); + sorted = true; + } + + @Override + protected void clearSortedValue() { + if (sortedValues != null) { + for (Binary[] dataArray : sortedValues) { + PrimitiveArrayPool.getInstance().release(dataArray); + } + sortedValues = null; + } + } + + @Override + protected void setFromSorted(int src, int dest) { + set(dest, sortedTimestamps[src/ARRAY_SIZE][src%ARRAY_SIZE], sortedValues[src/ARRAY_SIZE][src%ARRAY_SIZE]); + } + + @Override + protected void set(int src, int dest) { + long srcT = getTime(src); + Binary srcV = getBinary(src); + set(dest, srcT, srcV); + } + + @Override + protected void setToSorted(int src, int dest) { + sortedTimestamps[dest/ARRAY_SIZE][dest% ARRAY_SIZE] = getTime(src); + sortedValues[dest/ARRAY_SIZE][dest%ARRAY_SIZE] = getBinary(src); + } + + @Override + protected void reverseRange(int lo, int hi) { + hi--; + while (lo < hi) { + long loT = getTime(lo); + Binary loV = getBinary(lo); + long hiT = getTime(hi); + Binary hiV = getBinary(hi); + set(lo++, hiT, hiV); + set(hi--, loT, loV); + } + } + + @Override + protected void saveAsPivot(int pos) { + pivotTime = getTime(pos); + pivotValue = getBinary(pos); + } + + @Override + protected void setPivotTo(int pos) { + set(pos, pivotTime, pivotValue); + } + + @Override + public void putBinaries(long[] time, Binary[] value) { + checkExpansion(); + int idx = 0; + int length = time.length; + + for (int i = 0; i < length; i++) { + putBinary(time[i], value[i]); + } + +// updateMinTimeAndSorted(time); +// +// while (idx < length) { +// int inputRemaining = length - idx; +// int arrayIdx = size / ARRAY_SIZE; +// int elementIdx = size % ARRAY_SIZE; +// int internalRemaining = ARRAY_SIZE - elementIdx; +// if (internalRemaining >= inputRemaining) { +// // the remaining inputs can fit the last array, copy all remaining inputs into last array +// System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, inputRemaining); +// System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, inputRemaining); +// size += inputRemaining; +// break; +// } else { +// // the remaining inputs cannot fit the last array, fill the last array and create a new +// // one and enter the next loop +// System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, internalRemaining); +// System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, internalRemaining); +// idx += internalRemaining; +// size += internalRemaining; +// checkExpansion(); +// } +// } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMTVList.java index a71bd29..4570d66 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMTVList.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/NVMTVList.java @@ -170,8 +170,7 @@ public abstract class NVMTVList extends AbstractTVList { public static NVMTVList newList(String sgId, String deviceId, String measurementId, TSDataType dataType) { switch (dataType) { case TEXT: - // TODO -// return new BinaryTVList(); + return new NVMBinaryTVList(sgId, deviceId, measurementId); case FLOAT: return new NVMFloatTVList(sgId, deviceId, measurementId); case INT32: diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 3ec590d..ce3df8d 100644 --- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -36,6 +36,7 @@ public enum TSStatusCode { STORAGE_GROUP_PROCESSOR_ERROR(311), STORAGE_GROUP_ERROR(312), STORAGE_ENGINE_ERROR(313), + NVMSPACE_MANAGER_EROOR(314), EXECUTE_STATEMENT_ERROR(400), SQL_PARSE_ERROR(401), GENERATE_TIME_ZONE_ERROR(402),
