This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch opt_alignedTVList_column_order_check in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ba59a107bc854776afc8898e88be34efc420e7f0 Author: HTHou <[email protected]> AuthorDate: Fri May 19 14:58:26 2023 +0800 Optimize alignedTVList column order check --- .../engine/memtable/AlignedWritableMemChunk.java | 59 +++++++++++++++------- .../db/engine/memtable/IWritableMemChunk.java | 4 +- .../iotdb/db/engine/memtable/WritableMemChunk.java | 4 +- .../db/utils/datastructure/AlignedTVList.java | 49 +++++++----------- .../iotdb/db/utils/datastructure/TVList.java | 5 +- ...ectorTVListTest.java => AlignedTVListTest.java} | 16 +++--- 6 files changed, 72 insertions(+), 65 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java index dabd3b472db..1306da7b3d0 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java @@ -40,7 +40,7 @@ import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.HashMap; +import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -118,8 +118,8 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { } @Override - public boolean putAlignedValueWithFlushCheck(long t, Object[] v, int[] columnIndexArray) { - list.putAlignedValue(t, v, columnIndexArray); + public boolean putAlignedValueWithFlushCheck(long t, Object[] v) { + list.putAlignedValue(t, v); return list.reachMaxChunkSizeThreshold(); } @@ -156,8 +156,8 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { @Override public boolean putAlignedValuesWithFlushCheck( - long[] t, Object[] v, BitMap[] bitMaps, int[] columnIndexArray, int start, int end) { - list.putAlignedValues(t, v, bitMaps, columnIndexArray, start, end); + long[] t, Object[] v, BitMap[] bitMaps, int start, int end) { + list.putAlignedValues(t, v, bitMaps, start, end); return list.reachMaxChunkSizeThreshold(); } @@ -169,8 +169,9 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { @Override public boolean writeAlignedValueWithFlushCheck( long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) { - int[] columnIndexArray = checkColumnsInInsertPlan(schemaList); - return putAlignedValueWithFlushCheck(insertTime, objectValue, columnIndexArray); + Object[] reorderedValue = new Object[measurementIndexMap.size()]; + checkAndReorderColumnValuesInInsertPlan(schemaList, objectValue, null, reorderedValue, null); + return putAlignedValueWithFlushCheck(insertTime, reorderedValue); } @Override @@ -187,8 +188,15 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { List<IMeasurementSchema> schemaList, int start, int end) { - int[] columnIndexArray = checkColumnsInInsertPlan(schemaList); - return putAlignedValuesWithFlushCheck(times, valueList, bitMaps, columnIndexArray, start, end); + Object[] reorderedColumnValues = new Object[measurementIndexMap.size()]; + BitMap[] reorderedBitMaps = null; + if (bitMaps != null) { + reorderedBitMaps = new BitMap[measurementIndexMap.size()]; + } + checkAndReorderColumnValuesInInsertPlan( + schemaList, valueList, bitMaps, reorderedColumnValues, reorderedBitMaps); + return putAlignedValuesWithFlushCheck( + times, reorderedColumnValues, reorderedBitMaps, start, end); } /** @@ -198,24 +206,37 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { * have been deleted, there will be null in its slot. * @return columnIndexArray: schemaList[i] is schema of columns[columnIndexArray[i]] */ - private int[] checkColumnsInInsertPlan(List<IMeasurementSchema> schemaListInInsertPlan) { - Map<String, Integer> measurementIdsInInsertPlan = new HashMap<>(); + private void checkAndReorderColumnValuesInInsertPlan( + List<IMeasurementSchema> schemaListInInsertPlan, + Object[] columnValues, + BitMap[] bitMaps, + Object[] reorderedColumnValues, + BitMap[] reorderedBitMaps) { for (int i = 0; i < schemaListInInsertPlan.size(); i++) { - if (schemaListInInsertPlan.get(i) != null) { - measurementIdsInInsertPlan.put(schemaListInInsertPlan.get(i).getMeasurementId(), i); - if (!containsMeasurement(schemaListInInsertPlan.get(i).getMeasurementId())) { + IMeasurementSchema measurementSchema = schemaListInInsertPlan.get(i); + if (measurementSchema != null) { + Integer index = this.measurementIndexMap.get(measurementSchema.getMeasurementId()); + // Index is null means this measurement was not in this AlignedTVList before. + // We need to extend a new column in AlignedMemChunk and AlignedTVList. + // And the reorderedColumnValues should extend one more column for the new measurement + if (index == null) { this.measurementIndexMap.put( schemaListInInsertPlan.get(i).getMeasurementId(), measurementIndexMap.size()); this.schemaList.add(schemaListInInsertPlan.get(i)); this.list.extendColumn(schemaListInInsertPlan.get(i).getType()); + index = measurementIndexMap.size(); + reorderedColumnValues = + Arrays.copyOf(reorderedColumnValues, reorderedColumnValues.length + 1); + if (bitMaps != null) { + reorderedBitMaps = Arrays.copyOf(reorderedBitMaps, reorderedBitMaps.length + 1); + } + } + reorderedColumnValues[index] = columnValues[i]; + if (bitMaps != null) { + reorderedBitMaps[index] = reorderedBitMaps[i]; } } } - int[] columnIndexArray = new int[measurementIndexMap.size()]; - measurementIndexMap.forEach( - (measurementId, i) -> - columnIndexArray[i] = measurementIdsInInsertPlan.getOrDefault(measurementId, -1)); - return columnIndexArray; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java index 6eec2dc5598..24d0621adda 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java @@ -42,7 +42,7 @@ public interface IWritableMemChunk extends WALEntryValue { void putBoolean(long t, boolean v); - boolean putAlignedValueWithFlushCheck(long t, Object[] v, int[] columnIndexArray); + boolean putAlignedValueWithFlushCheck(long t, Object[] v); void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end); @@ -57,7 +57,7 @@ public interface IWritableMemChunk extends WALEntryValue { void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end); boolean putAlignedValuesWithFlushCheck( - long[] t, Object[] v, BitMap[] bitMaps, int[] columnIndexArray, int start, int end); + long[] t, Object[] v, BitMap[] bitMaps, int start, int end); boolean writeWithFlushCheck(long insertTime, Object objectValue); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java index 11b6f55d4d7..57c9c0fece8 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java @@ -159,7 +159,7 @@ public class WritableMemChunk implements IWritableMemChunk { } @Override - public boolean putAlignedValueWithFlushCheck(long t, Object[] v, int[] columnIndexArray) { + public boolean putAlignedValueWithFlushCheck(long t, Object[] v) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); } @@ -197,7 +197,7 @@ public class WritableMemChunk implements IWritableMemChunk { @Override public boolean putAlignedValuesWithFlushCheck( - long[] t, Object[] v, BitMap[] bitMaps, int[] columnIndexArray, int start, int end) { + long[] t, Object[] v, BitMap[] bitMaps, int start, int end) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index dfba8296963..f639b373f31 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -166,14 +166,14 @@ public abstract class AlignedTVList extends TVList { @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning @Override - public void putAlignedValue(long timestamp, Object[] value, int[] columnIndexArray) { + public void putAlignedValue(long timestamp, Object[] value) { checkExpansion(); int arrayIndex = rowCount / ARRAY_SIZE; int elementIndex = rowCount % ARRAY_SIZE; maxTime = Math.max(maxTime, timestamp); timestamps.get(arrayIndex)[elementIndex] = timestamp; for (int i = 0; i < values.size(); i++) { - Object columnValue = columnIndexArray[i] < 0 ? null : value[columnIndexArray[i]]; + Object columnValue = value[i]; List<Object> columnValues = values.get(i); if (columnValue == null) { markNullValue(i, arrayIndex, elementIndex); @@ -699,8 +699,7 @@ public abstract class AlignedTVList extends TVList { @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning @Override - public void putAlignedValues( - long[] time, Object[] value, BitMap[] bitMaps, int[] columnIndexArray, int start, int end) { + public void putAlignedValues(long[] time, Object[] value, BitMap[] bitMaps, int start, int end) { checkExpansion(); int idx = start; @@ -714,14 +713,12 @@ public abstract class AlignedTVList extends TVList { if (internalRemaining >= inputRemaining) { // the remaining inputs can fit the last array, copy all remaining inputs into last array System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, inputRemaining); - arrayCopy(value, idx, arrayIdx, elementIdx, inputRemaining, columnIndexArray); + arrayCopy(value, idx, arrayIdx, elementIdx, inputRemaining); for (int i = 0; i < inputRemaining; i++) { indices.get(arrayIdx)[elementIdx + i] = rowCount; for (int j = 0; j < values.size(); j++) { - if (columnIndexArray[j] < 0 - || bitMaps != null - && bitMaps[columnIndexArray[j]] != null - && bitMaps[columnIndexArray[j]].isMarked(idx + i)) { + if (value[j] == null + || bitMaps != null && bitMaps[j] != null && bitMaps[j].isMarked(idx + i)) { markNullValue(j, arrayIdx, elementIdx + i); } } @@ -732,14 +729,12 @@ public abstract class AlignedTVList extends TVList { // the remaining inputs cannot fit the last array, fill the last array and create a new // one and enter the next loop System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, internalRemaining); - arrayCopy(value, idx, arrayIdx, elementIdx, internalRemaining, columnIndexArray); + arrayCopy(value, idx, arrayIdx, elementIdx, internalRemaining); for (int i = 0; i < internalRemaining; i++) { indices.get(arrayIdx)[elementIdx + i] = rowCount; for (int j = 0; j < values.size(); j++) { - if (columnIndexArray[j] < 0 - || bitMaps != null - && bitMaps[columnIndexArray[j]] != null - && bitMaps[columnIndexArray[j]].isMarked(idx + i)) { + if (value[j] == null + || bitMaps != null && bitMaps[j] != null && bitMaps[j].isMarked(idx + i)) { markNullValue(j, arrayIdx, elementIdx + i); } } @@ -751,22 +746,16 @@ public abstract class AlignedTVList extends TVList { } } - private void arrayCopy( - Object[] value, - int idx, - int arrayIndex, - int elementIndex, - int remaining, - int[] columnIndexArray) { + private void arrayCopy(Object[] value, int idx, int arrayIndex, int elementIndex, int remaining) { for (int i = 0; i < values.size(); i++) { - if (columnIndexArray[i] < 0) { + if (value[i] == null) { continue; } List<Object> columnValues = values.get(i); switch (dataTypes.get(i)) { case TEXT: Binary[] arrayT = ((Binary[]) columnValues.get(arrayIndex)); - System.arraycopy(value[columnIndexArray[i]], idx, arrayT, elementIndex, remaining); + System.arraycopy(value[i], idx, arrayT, elementIndex, remaining); // update raw size of Text chunk for (int i1 = 0; i1 < remaining; i1++) { @@ -779,23 +768,23 @@ public abstract class AlignedTVList extends TVList { break; case FLOAT: float[] arrayF = ((float[]) columnValues.get(arrayIndex)); - System.arraycopy(value[columnIndexArray[i]], idx, arrayF, elementIndex, remaining); + System.arraycopy(value[i], idx, arrayF, elementIndex, remaining); break; case INT32: int[] arrayI = ((int[]) columnValues.get(arrayIndex)); - System.arraycopy(value[columnIndexArray[i]], idx, arrayI, elementIndex, remaining); + System.arraycopy(value[i], idx, arrayI, elementIndex, remaining); break; case INT64: long[] arrayL = ((long[]) columnValues.get(arrayIndex)); - System.arraycopy(value[columnIndexArray[i]], idx, arrayL, elementIndex, remaining); + System.arraycopy(value[i], idx, arrayL, elementIndex, remaining); break; case DOUBLE: double[] arrayD = ((double[]) columnValues.get(arrayIndex)); - System.arraycopy(value[columnIndexArray[i]], idx, arrayD, elementIndex, remaining); + System.arraycopy(value[i], idx, arrayD, elementIndex, remaining); break; case BOOLEAN: boolean[] arrayB = ((boolean[]) columnValues.get(arrayIndex)); - System.arraycopy(value[columnIndexArray[i]], idx, arrayB, elementIndex, remaining); + System.arraycopy(value[i], idx, arrayB, elementIndex, remaining); break; default: break; @@ -1065,10 +1054,8 @@ public abstract class AlignedTVList extends TVList { public static AlignedTVList deserialize(DataInputStream stream) throws IOException { int dataTypeNum = stream.readInt(); List<TSDataType> dataTypes = new ArrayList<>(dataTypeNum); - int[] columnIndexArray = new int[dataTypeNum]; for (int columnIndex = 0; columnIndex < dataTypeNum; ++columnIndex) { dataTypes.add(ReadWriteIOUtils.readDataType(stream)); - columnIndexArray[columnIndex] = columnIndex; } int rowCount = stream.readInt(); @@ -1152,7 +1139,7 @@ public abstract class AlignedTVList extends TVList { } AlignedTVList tvList = AlignedTVList.newAlignedList(dataTypes); - tvList.putAlignedValues(times, values, bitMaps, columnIndexArray, 0, rowCount); + tvList.putAlignedValues(times, values, bitMaps, 0, rowCount); return tvList; } } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index 01d68e445a8..f1e2697f049 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -158,7 +158,7 @@ public abstract class TVList implements WALEntryValue { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } - public void putAlignedValue(long time, Object[] value, int[] columnIndexArray) { + public void putAlignedValue(long time, Object[] value) { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } @@ -186,8 +186,7 @@ public abstract class TVList implements WALEntryValue { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } - public void putAlignedValues( - long[] time, Object[] value, BitMap[] bitMaps, int[] columnIndexArray, int start, int end) { + public void putAlignedValues(long[] time, Object[] value, BitMap[] bitMaps, int start, int end) { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } diff --git a/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/AlignedTVListTest.java similarity index 96% rename from server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java rename to server/src/test/java/org/apache/iotdb/db/utils/datastructure/AlignedTVListTest.java index 7f660e73b60..36fd5f3f30e 100644 --- a/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java +++ b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/AlignedTVListTest.java @@ -29,10 +29,10 @@ import org.junit.Test; import java.util.ArrayList; import java.util.List; -public class VectorTVListTest { +public class AlignedTVListTest { @Test - public void testVectorTVList1() { + public void testAlignedTVList1() { List<TSDataType> dataTypes = new ArrayList<>(); for (int i = 0; i < 5; i++) { dataTypes.add(TSDataType.INT64); @@ -45,7 +45,7 @@ public class VectorTVListTest { value[j] = i; columnOrder[j] = j; } - tvList.putAlignedValue(i, value, columnOrder); + tvList.putAlignedValue(i, value); } for (int i = 0; i < tvList.rowCount; i++) { StringBuilder builder = new StringBuilder("["); @@ -60,7 +60,7 @@ public class VectorTVListTest { } @Test - public void testVectorTVList2() { + public void testAlignedTVList2() { List<TSDataType> dataTypes = new ArrayList<>(); dataTypes.add(TSDataType.BOOLEAN); dataTypes.add(TSDataType.INT32); @@ -81,7 +81,7 @@ public class VectorTVListTest { for (int j = 0; j < 6; j++) { columnOrder[j] = j; } - tvList.putAlignedValue(i, value, columnOrder); + tvList.putAlignedValue(i, value); } tvList.sort(); for (int i = 0; i < tvList.rowCount; i++) { @@ -94,7 +94,7 @@ public class VectorTVListTest { } @Test - public void testVectorTVLists() { + public void testAlignedTVLists() { List<TSDataType> dataTypes = new ArrayList<>(); for (int i = 0; i < 5; i++) { dataTypes.add(TSDataType.INT64); @@ -124,7 +124,7 @@ public class VectorTVListTest { } @Test - public void testVectorTVListsWithBitMaps() { + public void testAlignedTVListsWithBitMaps() { List<TSDataType> dataTypes = new ArrayList<>(); BitMap[] bitMaps = new BitMap[5]; for (int i = 0; i < 5; i++) { @@ -224,7 +224,7 @@ public class VectorTVListTest { Object[] value = new Object[2]; value[0] = i; value[1] = new Binary(String.valueOf(i)); - tvList.putAlignedValue(i, value, columnOrder); + tvList.putAlignedValue(i, value); } Assert.assertEquals(tvList.memoryBinaryChunkSize[0], 0);
