This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch vectorMemTable in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 266202494c2a533c90c14152f804474678cbee64 Author: HTHou <[email protected]> AuthorDate: Fri Mar 12 19:45:53 2021 +0800 vector tvlist --- .../iotdb/db/engine/flush/MemTableFlushTask.java | 2 +- .../iotdb/db/engine/memtable/AbstractMemTable.java | 54 ++--- .../db/engine/memtable/IWritableMemChunk.java | 4 +- .../iotdb/db/engine/memtable/WritableMemChunk.java | 9 +- .../iotdb/db/utils/datastructure/TVList.java | 12 +- .../iotdb/db/utils/datastructure/VectorTVList.java | 245 +++++++++++++++++---- .../db/engine/memtable/PrimitiveMemTableTest.java | 64 +++++- .../db/utils/datastructure/VectorTVListTest.java | 67 ++++-- .../apache/iotdb/tsfile/utils/TsPrimitiveType.java | 4 +- .../write/writer/VectorMeasurementSchemaStub.java | 34 +++ 10 files changed, 401 insertions(+), 94 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java index 510258c..7c51053 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java @@ -101,7 +101,7 @@ public class MemTableFlushTask { long start = System.currentTimeMillis(); long sortTime = 0; - // for map do not use get(key) to iteratate + // for map do not use get(key) to iterate for (Map.Entry<String, Map<String, IWritableMemChunk>> memTableEntry : memTable.getMemTableMap().entrySet()) { encodingTaskQueue.put(new StartFlushGroupIOTask(memTableEntry.getKey())); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java index f22070e..4b8c0e8 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java @@ -137,33 +137,33 @@ public abstract class AbstractMemTable implements IMemTable { insertRowPlan.getMeasurements().length - insertRowPlan.getFailedMeasurementNumber(); } - @Override - public void insert(InsertVectorPlan insertVectorPlan) { - updatePlanIndexes(insertVectorPlan.getIndex()); - Object[] values = insertVectorPlan.getValues(); - - MeasurementMNode[] measurementMNodes = insertVectorPlan.getMeasurementMNodes(); - String[] measurements = insertVectorPlan.getMeasurements(); - IMeasurementSchema vmSchema = (IMeasurementSchema) measurementMNodes[0].getSchema(); - for (int i = 0; i < values.length; i++) { - Object value = values[i]; - if (value == null) { - continue; - } - - memSize += - MemUtils.getRecordSize( - vmSchema.getValueTSDataTypeList().get(i), value, disableMemControl); - } - write( - insertVectorPlan.getDeviceId().getFullPath(), - vmSchema, - insertVectorPlan.getTime(), - values); - - totalPointsNum += - insertVectorPlan.getMeasurements().length - insertVectorPlan.getFailedMeasurementNumber(); - } +// @Override +// public void insert(InsertVectorPlan insertVectorPlan) { +// updatePlanIndexes(insertVectorPlan.getIndex()); +// Object[] values = insertVectorPlan.getValues(); +// +// MeasurementMNode[] measurementMNodes = insertVectorPlan.getMeasurementMNodes(); +// String[] measurements = insertVectorPlan.getMeasurements(); +// IMeasurementSchema vmSchema = (IMeasurementSchema) measurementMNodes[0].getSchema(); +// for (int i = 0; i < values.length; i++) { +// Object value = values[i]; +// if (value == null) { +// continue; +// } +// +// memSize += +// MemUtils.getRecordSize( +// vmSchema.getValueTSDataTypeList().get(i), value, disableMemControl); +// } +// write( +// insertVectorPlan.getDeviceId().getFullPath(), +// vmSchema, +// insertVectorPlan.getTime(), +// values); +// +// totalPointsNum += +// insertVectorPlan.getMeasurements().length - insertVectorPlan.getFailedMeasurementNumber(); +// } @Override public void insertTablet(InsertTabletPlan insertTabletPlan, int start, int end) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java index 10298e7..7505042 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java @@ -37,7 +37,7 @@ public interface IWritableMemChunk { void putBoolean(long t, boolean v); - void putVector(long t, byte[] v); + void putVector(long t, Object[] v); void putLongs(long[] t, long[] v, int start, int end); @@ -51,6 +51,8 @@ public interface IWritableMemChunk { void putBooleans(long[] t, boolean[] v, int start, int end); + void putVectors(long[] t, Object[][] v, int start, int end); + void write(long insertTime, Object objectValue); /** [start, end) */ diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java index 1bee40a..ffc5d3a 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java @@ -56,7 +56,7 @@ public class WritableMemChunk implements IWritableMemChunk { putBinary(insertTime, (Binary) objectValue); break; case VECTOR: - putVector(insertTime, (byte[]) objectValue); + putVector(insertTime, (Object[]) objectValue); break; default: throw new UnSupportedDataTypeException("Unsupported data type:" + schema.getType()); @@ -126,7 +126,7 @@ public class WritableMemChunk implements IWritableMemChunk { } @Override - public void putVector(long t, byte[] v) { + public void putVector(long t, Object[] v) { list.putVector(t, v); } @@ -161,6 +161,11 @@ public class WritableMemChunk implements IWritableMemChunk { } @Override + public void putVectors(long[] t, Object[][] v, int start, int end) { + list.putVectors(t, v, start, end); + } + + @Override public synchronized TVList getSortedTVListForQuery() { sortTVList(); // increase reference count diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index 3e0ef74..9cba9c8 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -73,14 +73,16 @@ public abstract class TVList { return new DoubleTVList(); case BOOLEAN: return new BooleanTVList(); - case VECTOR: - return new VectorTVList(); default: break; } return null; } + public static TVList newVectorList(List<TSDataType> datatypes) { + return new VectorTVList(datatypes); + } + public static long tvListArrayMemSize(TSDataType type) { long size = 0; // time size @@ -139,7 +141,7 @@ public abstract class TVList { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } - public void putVector(long time, byte[] value) { + public void putVector(long time, Object[] value) { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } @@ -167,7 +169,7 @@ public abstract class TVList { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } - public void putVectors(long[] time, byte[][] value, int start, int end) { + public void putVectors(long[] time, Object[] value, int start, int end) { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } @@ -195,7 +197,7 @@ public abstract class TVList { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } - public byte[] getVector(int index) { + public Object getVector(int index) { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java index e0a652a..6c472ef 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java @@ -22,6 +22,7 @@ import org.apache.iotdb.db.rescon.PrimitiveArrayManager; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.TimeValuePair; +import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.TsPrimitiveType; import java.util.ArrayList; @@ -31,25 +32,60 @@ import static org.apache.iotdb.db.rescon.PrimitiveArrayManager.ARRAY_SIZE; public class VectorTVList extends TVList { - private List<Object[]> values; + private List<TSDataType> dataTypes; - private byte[][][] sortedValues; + private List<List<Object>> values; - private byte[] pivotValue; + private List<int[]> indices; - VectorTVList() { + private int[][] sortedIndices; + + private int pivotIndex; + + VectorTVList(List<TSDataType> types) { super(); + indices = new ArrayList<>(); + dataTypes = types; values = new ArrayList<>(); + for (int i = 0; i < types.size(); i++) { + values.add(new ArrayList<>()); + } } @Override - public void putVector(long timestamp, byte[] value) { + public void putVector(long timestamp, Object[] value) { checkExpansion(); int arrayIndex = size / ARRAY_SIZE; int elementIndex = size % ARRAY_SIZE; minTime = Math.min(minTime, timestamp); timestamps.get(arrayIndex)[elementIndex] = timestamp; - values.get(arrayIndex)[elementIndex] = value; + for (int i = 0; i < values.size(); i++) { + Object columnValue = value[i]; + List<Object> columnValues = values.get(i); + switch (dataTypes.get(i)) { + case TEXT: + ((Binary[]) columnValues.get(arrayIndex))[elementIndex] = ((Binary[]) columnValue)[0]; + break; + case FLOAT: + ((float[]) columnValues.get(arrayIndex))[elementIndex] = ((float[]) columnValue)[0]; + break; + case INT32: + ((int[]) columnValues.get(arrayIndex))[elementIndex] = ((int[]) columnValue)[0]; + break; + case INT64: + ((long[]) columnValues.get(arrayIndex))[elementIndex] = ((long[]) columnValue)[0]; + break; + case DOUBLE: + ((double[]) columnValues.get(arrayIndex))[elementIndex] = ((double[]) columnValue)[0]; + break; + case BOOLEAN: + ((boolean[]) columnValues.get(arrayIndex))[elementIndex] = ((boolean[]) columnValue)[0]; + break; + default: + break; + } + } + indices.get(arrayIndex)[elementIndex] = size; size++; if (sorted && size > 1 && timestamp < getTime(size - 2)) { sorted = false; @@ -57,39 +93,109 @@ public class VectorTVList extends TVList { } @Override - public byte[] getVector(int index) { + public Object getVector(int index) { if (index >= size) { throw new ArrayIndexOutOfBoundsException(index); } int arrayIndex = index / ARRAY_SIZE; int elementIndex = index % ARRAY_SIZE; - return (byte[]) values.get(arrayIndex)[elementIndex]; + int valueIndex = indices.get(arrayIndex)[elementIndex]; + return getVectorByValueIndex(valueIndex); } - protected void set(int index, long timestamp, byte[] value) { + private Object getVectorByValueIndex(int valueIndex) { + if (valueIndex >= size) { + throw new ArrayIndexOutOfBoundsException(valueIndex); + } + int arrayIndex = valueIndex / ARRAY_SIZE; + int elementIndex = valueIndex % ARRAY_SIZE; + TsPrimitiveType[] vector = new TsPrimitiveType[values.size()]; + for (int i = 0; i < values.size(); i++) { + List<Object> columnValues = values.get(i); + switch (dataTypes.get(i)) { + case TEXT: + vector[i] = TsPrimitiveType.getByType(dataTypes.get(i), ((Binary[]) columnValues.get(arrayIndex))[elementIndex]); + break; + case FLOAT: + vector[i] = TsPrimitiveType.getByType(dataTypes.get(i), ((float[]) columnValues.get(arrayIndex))[elementIndex]); + break; + case INT32: + vector[i] = TsPrimitiveType.getByType(dataTypes.get(i), ((int[]) columnValues.get(arrayIndex))[elementIndex]); + break; + case INT64: + vector[i] = TsPrimitiveType.getByType(dataTypes.get(i), ((long[]) columnValues.get(arrayIndex))[elementIndex]); + break; + case DOUBLE: + vector[i] = TsPrimitiveType.getByType(dataTypes.get(i), ((double[]) columnValues.get(arrayIndex))[elementIndex]); + break; + case BOOLEAN: + vector[i] = TsPrimitiveType.getByType(dataTypes.get(i), ((boolean[]) columnValues.get(arrayIndex))[elementIndex]); + break; + default: + break; + } + } + return TsPrimitiveType.getByType(TSDataType.VECTOR, vector); + } + + protected void set(int index, long timestamp, int valueIndex) { if (index >= size) { throw new ArrayIndexOutOfBoundsException(index); } int arrayIndex = index / ARRAY_SIZE; int elementIndex = index % ARRAY_SIZE; timestamps.get(arrayIndex)[elementIndex] = timestamp; - values.get(arrayIndex)[elementIndex] = value; + indices.get(arrayIndex)[elementIndex] = valueIndex; } @Override public VectorTVList clone() { - VectorTVList cloneList = new VectorTVList(); + VectorTVList cloneList = new VectorTVList(dataTypes); cloneAs(cloneList); - for (Object[] valueArray : values) { - cloneList.values.add(cloneValue(valueArray)); + for (int i = 0; i < values.size(); i++) { + List<Object> columnValues = values.get(i); + for (Object valueArray : columnValues) { + cloneList.values.get(i).add(cloneValue(dataTypes.get(i), valueArray)); + } } return cloneList; } - private TsPrimitiveType[][] cloneValue(Object[] valueArray) { - TsPrimitiveType[][] cloneArray = (TsPrimitiveType[][])new Object[valueArray.length]; - System.arraycopy(valueArray, 0, cloneArray, 0, valueArray.length); - return cloneArray; + private Object cloneValue(TSDataType type, Object value) { + switch (type) { + case TEXT: + Binary[] valueT = (Binary[]) value; + Binary[] cloneT = new Binary[valueT.length]; + System.arraycopy(valueT, 0, cloneT, 0, valueT.length); + return cloneT; + case FLOAT: + float[] valueF = (float[]) value; + float[] cloneF= new float[valueF.length]; + System.arraycopy(valueF, 0, cloneF, 0, valueF.length); + return cloneF; + case INT32: + int[] valueI = (int[]) value; + int[] cloneI = new int[valueI.length]; + System.arraycopy(valueI, 0, cloneI, 0, valueI.length); + return cloneI; + case INT64: + long[] valueL = (long[]) value; + long[] cloneL = new long[valueL.length]; + System.arraycopy(valueL, 0, cloneL, 0, valueL.length); + return cloneL; + case DOUBLE: + double[] valueD = (double[]) value; + double[] cloneD = new double[valueD.length]; + System.arraycopy(valueD, 0, cloneD, 0, valueD.length); + return cloneD; + case BOOLEAN: + boolean[] valueB = (boolean[]) value; + boolean[] cloneB = new boolean[valueB.length]; + System.arraycopy(valueB, 0, cloneB, 0, valueB.length); + return cloneB; + default: + return null; + } } @Override @@ -98,10 +204,8 @@ public class VectorTVList extends TVList { sortedTimestamps = (long[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.INT64, size); } - if (sortedValues == null || sortedValues.length < size) { - sortedValues = - (byte[][][]) - PrimitiveArrayManager.createDataListsByType(TSDataType.VECTOR, size); + if (sortedIndices == null || sortedIndices.length < size) { + sortedIndices = (int[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.INT32, size); } sort(0, size); clearSortedValue(); @@ -111,18 +215,27 @@ public class VectorTVList extends TVList { @Override void clearValue() { - if (values != null) { - for (Object[] dataArray : values) { + if (indices != null) { + for (int[] dataArray : indices) { PrimitiveArrayManager.release(dataArray); } - values.clear(); + indices.clear(); + } + for (int i = 0; i < dataTypes.size(); i++) { + List<Object> columnValues = values.get(i); + if (columnValues != null) { + for (Object dataArray : columnValues) { + PrimitiveArrayManager.release(dataArray); + } + columnValues.clear(); + } } } @Override void clearSortedValue() { - if (sortedValues != null) { - sortedValues = null; + if (sortedIndices != null) { + sortedIndices = null; } } @@ -131,20 +244,20 @@ public class VectorTVList extends TVList { set( dest, sortedTimestamps[src / ARRAY_SIZE][src % ARRAY_SIZE], - sortedValues[src / ARRAY_SIZE][src % ARRAY_SIZE]); + sortedIndices[src / ARRAY_SIZE][src % ARRAY_SIZE]); } @Override protected void set(int src, int dest) { long srcT = getTime(src); - byte[] srcV = getVector(src); + int srcV = getValueIndex(src); set(dest, srcT, srcV); } @Override protected void setToSorted(int src, int dest) { sortedTimestamps[dest / ARRAY_SIZE][dest % ARRAY_SIZE] = getTime(src); - sortedValues[dest / ARRAY_SIZE][dest % ARRAY_SIZE] = getVector(src); + sortedIndices[dest / ARRAY_SIZE][dest % ARRAY_SIZE] = getValueIndex(src); } @Override @@ -152,9 +265,9 @@ public class VectorTVList extends TVList { hi--; while (lo < hi) { long loT = getTime(lo); - byte[] loV = getVector(lo); + int loV = getValueIndex(lo); long hiT = getTime(hi); - byte[] hiV = getVector(hi); + int hiV = getValueIndex(hi); set(lo++, hiT, hiV); set(hi--, loT, loV); } @@ -162,18 +275,30 @@ public class VectorTVList extends TVList { @Override protected void expandValues() { - values.add((Object[]) getPrimitiveArraysByType(TSDataType.VECTOR)); + indices.add((int[]) getPrimitiveArraysByType(TSDataType.INT32)); + for (int i = 0; i < dataTypes.size(); i++) { + values.get(i).add(getPrimitiveArraysByType(dataTypes.get(i))); + } } @Override protected void saveAsPivot(int pos) { pivotTime = getTime(pos); - pivotValue = getVector(pos); + pivotIndex = getValueIndex(pos); + } + + private int getValueIndex(int index) { + if (index >= size) { + throw new ArrayIndexOutOfBoundsException(index); + } + int arrayIndex = index / ARRAY_SIZE; + int elementIndex = index % ARRAY_SIZE; + return indices.get(arrayIndex)[elementIndex]; } @Override protected void setPivotTo(int pos) { - set(pos, pivotTime, pivotValue); + set(pos, pivotTime, pivotIndex); } @Override @@ -190,11 +315,11 @@ public class VectorTVList extends TVList { @Override protected void releaseLastValueArray() { - PrimitiveArrayManager.release(values.remove(values.size() - 1)); + PrimitiveArrayManager.release(indices.remove(indices.size() - 1)); } @Override - public void putVectors(long[] time, byte[][] value, int start, int end) { + public void putVectors(long[] time, Object[] value, int start, int end) { checkExpansion(); int idx = start; @@ -208,21 +333,61 @@ public class VectorTVList extends TVList { if (internalRemaining >= inputRemaining) { // the remaining inputs can fit the last array, copy all remaining inputs into last array System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, inputRemaining); - System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, inputRemaining); - size += inputRemaining; + arrayCopy(value, idx, arrayIdx, elementIdx, inputRemaining); + for (int i = 0; i < inputRemaining; i++) { + indices.get(arrayIdx)[elementIdx] = size; + size++; + } break; } else { // the remaining inputs cannot fit the last array, fill the last array and create a new // one and enter the next loop System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, internalRemaining); - System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, internalRemaining); + arrayCopy(value, idx, arrayIdx, elementIdx, internalRemaining); idx += internalRemaining; - size += internalRemaining; + for (int i = 0; i < internalRemaining; i++) { + indices.get(arrayIdx)[elementIdx] = size; + size++; + } checkExpansion(); } } } + private void arrayCopy(Object[] value, int idx, int arrayIndex, int elementIndex, int remaining) { + for (int i = 0; i < values.size(); i++) { + List<Object> columnValues = values.get(i); + switch (dataTypes.get(i)) { + case TEXT: + Binary[] arrayT = ((Binary[]) columnValues.get(arrayIndex)); + System.arraycopy((Binary[]) value[i], idx, arrayT, elementIndex, remaining); + break; + case FLOAT: + float[] arrayF = ((float[]) columnValues.get(arrayIndex)); + System.arraycopy((float[]) value[i], idx, arrayF, elementIndex, remaining); + break; + case INT32: + int[] arrayI = ((int[]) columnValues.get(arrayIndex)); + System.arraycopy((int[]) value[i], idx, arrayI, elementIndex, remaining); + break; + case INT64: + long[] arrayL = ((long[]) columnValues.get(arrayIndex)); + System.arraycopy((long[]) value[i], idx, arrayL, elementIndex, remaining); + break; + case DOUBLE: + double[] arrayD = ((double[]) columnValues.get(arrayIndex)); + System.arraycopy((double[]) value[i], idx, arrayD, elementIndex, remaining); + break; + case BOOLEAN: + boolean[] arrayB = ((boolean[]) columnValues.get(arrayIndex)); + System.arraycopy((boolean[]) value[i], idx, arrayB, elementIndex, remaining); + break; + default: + break; + } + } + } + @Override public TSDataType getDataType() { return TSDataType.VECTOR; diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java index 907beb2..1494658 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java @@ -31,8 +31,8 @@ import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.reader.IPointReader; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.TsPrimitiveType; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; - import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -173,6 +173,53 @@ public class PrimitiveMemTableTest { } } + private void writeVector( + IMemTable memTable, + String deviceId, + String[] sensorIds, + TSDataType[] types, + TSEncoding[] encodings, + int size) + throws IOException, QueryProcessException, MetadataException { + TimeValuePair[] ret = genTimeValuePair(size, TSDataType.VECTOR); + + for (TimeValuePair aRet : ret) { + memTable.write( + deviceId, + sensorIds[0], + new VectorMeasurementSchema(sensorIds, types, encodings), + aRet.getTimestamp(), + aRet.getValue().getValue()); + } + IPointReader tvPair = + memTable + .query( + deviceId, + sensorIds[0], + types[0], + encodings[0], + Collections.emptyMap(), + Long.MIN_VALUE, + null) + .getPointReader(); + Arrays.sort(ret); + TimeValuePair last = null; + for (int i = 0; i < ret.length; i++) { + while (last != null && (i < ret.length && last.getTimestamp() == ret[i].getTimestamp())) { + i++; + } + if (i >= ret.length) { + break; + } + TimeValuePair pair = ret[i]; + last = pair; + tvPair.hasNextTimeValuePair(); + TimeValuePair next = tvPair.nextTimeValuePair(); + Assert.assertEquals(pair.getTimestamp(), next.getTimestamp()); + Assert.assertEquals(pair.getValue(), next.getValue()); + } + } + @Test public void testFloatType() throws IOException, QueryProcessException, MetadataException { IMemTable memTable = new PrimitiveMemTable(); @@ -199,6 +246,13 @@ public class PrimitiveMemTableTest { write(memTable, deviceId, measurementId[index++], TSDataType.FLOAT, TSEncoding.RLE, size); write(memTable, deviceId, measurementId[index++], TSDataType.DOUBLE, TSEncoding.RLE, size); write(memTable, deviceId, measurementId[index++], TSDataType.TEXT, TSEncoding.PLAIN, size); + writeVector( + memTable, + deviceId, + new String[] {measurementId[index++]}, + new TSDataType[] {TSDataType.INT32}, + new TSEncoding[] {TSEncoding.RLE}, + size); } private TimeValuePair[] genTimeValuePair(int size, TSDataType dataType) { @@ -235,6 +289,14 @@ public class PrimitiveMemTableTest { rand.nextLong(), TsPrimitiveType.getByType(dataType, new Binary("a" + rand.nextDouble()))); break; + case VECTOR: + TsPrimitiveType[] values = new TsPrimitiveType[1]; + values[0] = TsPrimitiveType.getByType(TSDataType.INT32, rand.nextInt()); + ret[i] = + new TimeValuePair( + rand.nextLong(), + TsPrimitiveType.getByType(dataType, values)); + break; default: throw new UnSupportedDataTypeException("Unsupported data type:" + dataType); } diff --git a/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java index 7095612..60f25ce 100644 --- a/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java +++ b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java @@ -18,8 +18,8 @@ */ package org.apache.iotdb.db.utils.datastructure; -import org.apache.iotdb.tsfile.utils.BytesUtils; import org.apache.commons.lang3.ArrayUtils; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.junit.Assert; import org.junit.Test; @@ -29,37 +29,74 @@ import java.util.List; public class VectorTVListTest { @Test - public void testVectorTVList() { - VectorTVList tvList = new VectorTVList(); + public void testVectorTVList1() { + List<TSDataType> dataTypes = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + dataTypes.add(TSDataType.INT32); + } + VectorTVList tvList = new VectorTVList(dataTypes); for (int i = 0; i < 1000; i++) { - byte[] value = new byte[4 * 5]; - byte[] bytes = new byte[4]; - for (int j = 0; j < 20; j++) { - if (j % 4 == 0) { - bytes = BytesUtils.intToBytes(i); - } - value[j] = bytes[j % 4]; + int[][] value = new int[5][1]; + for (int j = 0; j < 5; j++) { + value[j][0] = i; } tvList.putVector(i, value); } for (int i = 0; i < tvList.size; i++) { - Assert.assertEquals(String.valueOf(i), tvList.getVector(i).toString()); + StringBuilder builder = new StringBuilder("["); + builder.append(String.valueOf(i)); + for (int j = 1; j < 5; j++) { + builder.append(", ").append(String.valueOf(i)); + } + builder.append("]"); + Assert.assertEquals(builder.toString(), tvList.getVector(i).toString()); + Assert.assertEquals(i, tvList.getTime(i)); + } + } + + @Test + public void testVectorTVList2() { + List<TSDataType> dataTypes = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + dataTypes.add(TSDataType.INT32); + } + VectorTVList tvList = new VectorTVList(dataTypes); + for (int i = 1000; i >= 0; i--) { + int[][] value = new int[5][1]; + for (int j = 0; j < 5; j++) { + value[j][0] = i; + } + tvList.putVector(i, value); + } + tvList.sort(); + for (int i = 0; i < tvList.size; i++) { + StringBuilder builder = new StringBuilder("["); + builder.append(String.valueOf(i)); + for (int j = 1; j < 5; j++) { + builder.append(", ").append(String.valueOf(i)); + } + builder.append("]"); + Assert.assertEquals(builder.toString(), tvList.getVector(i).toString()); Assert.assertEquals(i, tvList.getTime(i)); } } @Test public void testVectorTVLists() { - VectorTVList tvList = new VectorTVList(); - byte[][] vectorList = new byte[1001][4 * 5]; + List<TSDataType> dataTypes = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + dataTypes.add(TSDataType.INT64); + } + VectorTVList tvList = new VectorTVList(dataTypes); + long[][] vectorArray = new long[5][1001]; List<Long> timeList = new ArrayList<>(); for (int i = 1000; i >= 0; i--) { timeList.add((long) i); for (int j = 0; j < 5; j++) { - vectorList[i][j] = 0; + vectorArray[j][i] = (long) i; } } - tvList.putVectors(ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorList, 0, 1000); + tvList.putVectors(ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorArray, 0, 1000); for (long i = 0; i < tvList.size; i++) { Assert.assertEquals(tvList.size - i, tvList.getTime((int) i)); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java index b4efd18..1deb4b5 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java @@ -510,8 +510,8 @@ public abstract class TsPrimitiveType implements Serializable { public String getStringValue() { StringBuilder builder = new StringBuilder("["); builder.append(value[0].getStringValue()); - for (TsPrimitiveType type : value) { - builder.append(", ").append(type.getStringValue()); + for (int i = 1; i < value.length; i++) { + builder.append(", ").append(value[i].getStringValue()); } builder.append("]"); return builder.toString(); diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java index 40335f5..f68687b 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/VectorMeasurementSchemaStub.java @@ -25,8 +25,12 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; +import java.util.Map; public class VectorMeasurementSchemaStub implements IMeasurementSchema { @@ -77,4 +81,34 @@ public class VectorMeasurementSchemaStub implements IMeasurementSchema { new PlainEncoder(TSDataType.INT32, 0), new PlainEncoder(TSDataType.DOUBLE, 0)); } + + @Override + public TSEncoding getEncodingType() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Encoder getValueEncoder() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Map<String, String> getProps() { + // TODO Auto-generated method stub + return null; + } + + @Override + public int serializeTo(ByteBuffer buffer) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int serializeTo(OutputStream outputStream) throws IOException { + // TODO Auto-generated method stub + return 0; + } }
