This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 5e66c15c14b [To dev/1.3] fix: MemoryNotEnough exception & clone &
delete issues (#15170)
5e66c15c14b is described below
commit 5e66c15c14b2c8a385a9ce8e56f7be3bf6e4b1fa
Author: shizy <[email protected]>
AuthorDate: Thu Mar 27 18:42:26 2025 +0800
[To dev/1.3] fix: MemoryNotEnough exception & clone & delete issues (#15170)
* fix: MemoryNotEnough exception when flushing try to release tvlist
* retry release tvlist if memory not enough during flush
* synchronized clone method for TVList
* query context list -> query context set
* fix: remove query context when abort finish
* refactor: reorganize AbstractWritableMemChunk
* fix: float/double value
* fix: some testcase bugs
* fix: delete column bug for aligned timeseries
* fix: clone data types list when create new AlignedTVList
* fix: bitmap is null when delete column
* init bitmaps when it is null
* format
* fix: error from cherry-pick
* set memchunk datatypes when handover
---
.../it/schema/IoTDBDeleteAlignedTimeseriesIT.java | 42 +++
.../fragment/FragmentInstanceContext.java | 42 ++-
.../schemaregion/utils/ResourceByPathUtils.java | 12 +-
.../memtable/AbstractWritableMemChunk.java | 214 +++++++++++++++
.../memtable/AlignedWritableMemChunk.java | 299 ++++++++-------------
.../dataregion/memtable/IWritableMemChunk.java | 29 --
.../dataregion/memtable/WritableMemChunk.java | 173 +++---------
.../db/utils/datastructure/AlignedTVList.java | 82 +++---
.../iotdb/db/utils/datastructure/BinaryTVList.java | 2 +-
.../db/utils/datastructure/BooleanTVList.java | 2 +-
.../iotdb/db/utils/datastructure/DoubleTVList.java | 2 +-
.../iotdb/db/utils/datastructure/FloatTVList.java | 2 +-
.../iotdb/db/utils/datastructure/IntTVList.java | 2 +-
.../iotdb/db/utils/datastructure/LongTVList.java | 2 +-
.../datastructure/MultiAlignedTVListIterator.java | 21 +-
.../iotdb/db/utils/datastructure/TVList.java | 21 +-
.../dataregion/memtable/PrimitiveMemTableTest.java | 83 ++++++
17 files changed, 601 insertions(+), 429 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBDeleteAlignedTimeseriesIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBDeleteAlignedTimeseriesIT.java
index 3defa886f8d..79198e97429 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBDeleteAlignedTimeseriesIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBDeleteAlignedTimeseriesIT.java
@@ -246,4 +246,46 @@ public class IoTDBDeleteAlignedTimeseriesIT extends
AbstractSchemaIT {
fail(e.getMessage());
}
}
+
+ @Test
+ public void deleteTimeseriesAndCreateSameTypeTest2() throws Exception {
+ String[] retArray = new String[] {"1,4.0,", "2,8.0,"};
+ int cnt = 0;
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ "create aligned timeseries root.turbine1.d1(s1 FLOAT encoding=PLAIN
compression=SNAPPY, "
+ + "s2 INT64 encoding=PLAIN compression=SNAPPY, s4 DOUBLE
encoding=PLAIN compression=SNAPPY)");
+ statement.execute("INSERT INTO root.turbine1.d1(timestamp,s1,s2,s4)
ALIGNED VALUES(1,1,2,4)");
+
+ try (ResultSet resultSet = statement.executeQuery("SELECT s4 FROM
root.turbine1.d1")) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ while (resultSet.next()) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ builder.append(resultSet.getString(i)).append(",");
+ }
+ Assert.assertEquals(retArray[cnt], builder.toString());
+ cnt++;
+ }
+ }
+ // delete series in the middle
+ statement.execute("DELETE timeseries root.turbine1.d1.s4");
+ statement.execute(
+ "INSERT INTO root.turbine1.d1(timestamp,s3,s4) ALIGNED
VALUES(2,false,8.0)");
+ statement.execute("FLUSH");
+
+ try (ResultSet resultSet = statement.executeQuery("SELECT s4 FROM
root.turbine1.d1")) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ while (resultSet.next()) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ builder.append(resultSet.getString(i)).append(",");
+ }
+ Assert.assertEquals(retArray[cnt], builder.toString());
+ cnt++;
+ }
+ }
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 58f06bfb270..7e2c144b0fb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -186,6 +186,22 @@ public class FragmentInstanceContext extends QueryContext {
return instanceContext;
}
+ @TestOnly
+ public static FragmentInstanceContext createFragmentInstanceContext(
+ FragmentInstanceId id,
+ FragmentInstanceStateMachine stateMachine,
+ MemoryReservationManager memoryReservationManager) {
+ FragmentInstanceContext instanceContext =
+ new FragmentInstanceContext(
+ id,
+ stateMachine,
+ new SessionInfo(1, "test", ZoneId.systemDefault()),
+ memoryReservationManager);
+ instanceContext.initialize();
+ instanceContext.start();
+ return instanceContext;
+ }
+
private FragmentInstanceContext(
FragmentInstanceId id,
FragmentInstanceStateMachine stateMachine,
@@ -218,6 +234,20 @@ public class FragmentInstanceContext extends QueryContext {
new ThreadSafeMemoryReservationManager(id.getQueryId(),
this.getClass().getName());
}
+ private FragmentInstanceContext(
+ FragmentInstanceId id,
+ FragmentInstanceStateMachine stateMachine,
+ SessionInfo sessionInfo,
+ MemoryReservationManager memoryReservationManager) {
+ this.id = id;
+ this.stateMachine = stateMachine;
+ this.executionEndTime.set(END_TIME_INITIAL_VALUE);
+ this.sessionInfo = sessionInfo;
+ this.dataNodeQueryContextMap = null;
+ this.dataNodeQueryContext = null;
+ this.memoryReservationManager = memoryReservationManager;
+ }
+
private FragmentInstanceContext(
FragmentInstanceId id,
FragmentInstanceStateMachine stateMachine,
@@ -651,11 +681,11 @@ public class FragmentInstanceContext extends QueryContext
{
private void releaseTVListOwnedByQuery() {
for (TVList tvList : tvListSet) {
tvList.lockQueryList();
- List<QueryContext> queryContextList = tvList.getQueryContextList();
+ Set<QueryContext> queryContextSet = tvList.getQueryContextSet();
try {
- queryContextList.remove(this);
+ queryContextSet.remove(this);
if (tvList.getOwnerQuery() == this) {
- if (queryContextList.isEmpty()) {
+ if (queryContextSet.isEmpty()) {
LOGGER.debug(
"TVList {} is released by the query, FragmentInstance Id is
{}",
tvList,
@@ -663,11 +693,13 @@ public class FragmentInstanceContext extends QueryContext
{
memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize());
tvList.clear();
} else {
+ FragmentInstanceContext queryContext =
+ (FragmentInstanceContext) queryContextSet.iterator().next();
LOGGER.debug(
"TVList {} is now owned by another query, FragmentInstance Id
is {}",
tvList,
- ((FragmentInstanceContext) queryContextList.get(0)).getId());
- tvList.setOwnerQuery(queryContextList.get(0));
+ queryContext.getId());
+ tvList.setOwnerQuery(queryContext);
}
}
} finally {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
index adba4a163dc..78dce2cff39 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
@@ -135,7 +135,7 @@ public abstract class ResourceByPathUtils {
try {
LOGGER.debug(
"Flushing/Working MemTable - add current query context to
immutable TVList's query list");
- tvList.getQueryContextList().add(context);
+ tvList.getQueryContextSet().add(context);
tvListQueryMap.put(tvList, tvList.rowCount());
} finally {
tvList.unlockQueryList();
@@ -155,13 +155,13 @@ public abstract class ResourceByPathUtils {
if (!isWorkMemTable) {
LOGGER.debug(
"Flushing MemTable - add current query context to mutable TVList's
query list");
- list.getQueryContextList().add(context);
+ list.getQueryContextSet().add(context);
tvListQueryMap.put(list, list.rowCount());
} else {
- if (list.isSorted() || list.getQueryContextList().isEmpty()) {
+ if (list.isSorted() || list.getQueryContextSet().isEmpty()) {
LOGGER.debug(
"Working MemTable - add current query context to mutable
TVList's query list when it's sorted or no other query on it");
- list.getQueryContextList().add(context);
+ list.getQueryContextSet().add(context);
tvListQueryMap.put(list, list.rowCount());
} else {
/*
@@ -180,7 +180,7 @@ public abstract class ResourceByPathUtils {
*/
LOGGER.debug(
"Working MemTable - clone mutable TVList and replace old TVList
in working MemTable");
- QueryContext firstQuery = list.getQueryContextList().get(0);
+ QueryContext firstQuery =
list.getQueryContextSet().iterator().next();
// reserve query memory
if (firstQuery instanceof FragmentInstanceContext) {
MemoryReservationManager memoryReservationManager =
@@ -191,7 +191,7 @@ public abstract class ResourceByPathUtils {
// clone TVList
cloneList = list.clone();
- cloneList.getQueryContextList().add(context);
+ cloneList.getQueryContextSet().add(context);
tvListQueryMap.put(cloneList, cloneList.rowCount());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
new file mode 100644
index 00000000000..1030f2f641a
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.memtable;
+
+import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
+import
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
+import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.chunk.IChunkWriter;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+public abstract class AbstractWritableMemChunk implements IWritableMemChunk {
+ protected static long RETRY_INTERVAL_MS = 100L;
+ protected static long MAX_WAIT_QUERY_MS = 60 * 1000L;
+
+ /**
+ * Release the TVList if there is no query on it. Otherwise, it should set
the first query as the
+ * owner. TVList is released until all queries finish. If it throws
memory-not-enough exception
+ * during owner transfer, retry the release process after 100ms. If the
problem is still not
+ * solved in 60s, it starts to abort first query, kick it out of the query
list and retry. This
+ * method must ensure success because it's part of flushing.
+ *
+ * @param tvList
+ */
+ protected void maybeReleaseTvList(TVList tvList) {
+ long startTimeInMs = System.currentTimeMillis();
+ boolean succeed = false;
+ while (!succeed) {
+ try {
+ tryReleaseTvList(tvList);
+ succeed = true;
+ } catch (MemoryNotEnoughException ex) {
+ long waitQueryInMs = System.currentTimeMillis() - startTimeInMs;
+ if (waitQueryInMs > MAX_WAIT_QUERY_MS) {
+ // Abort first query in the list. When all queries in the list have
been aborted,
+ // tryReleaseTvList will ensure succeed finally.
+ tvList.lockQueryList();
+ try {
+ // fail the first query
+ Iterator<QueryContext> iterator =
tvList.getQueryContextSet().iterator();
+ if (iterator.hasNext()) {
+ FragmentInstanceContext firstQuery = (FragmentInstanceContext)
iterator.next();
+ firstQuery.failed(
+ new MemoryNotEnoughException(
+ "Memory not enough to clone the tvlist during flush
phase"));
+ }
+ } finally {
+ tvList.unlockQueryList();
+ }
+ }
+
+ // sleep 100ms to retry
+ try {
+ Thread.sleep(RETRY_INTERVAL_MS);
+ } catch (InterruptedException ignore) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ private void tryReleaseTvList(TVList tvList) {
+ tvList.lockQueryList();
+ try {
+ if (tvList.getQueryContextSet().isEmpty()) {
+ tvList.clear();
+ } else {
+ QueryContext firstQuery =
tvList.getQueryContextSet().iterator().next();
+ // transfer memory from write process to read process. Here it
reserves read memory and
+ // releaseFlushedMemTable will release write memory.
+ if (firstQuery instanceof FragmentInstanceContext) {
+ MemoryReservationManager memoryReservationManager =
+ ((FragmentInstanceContext)
firstQuery).getMemoryReservationContext();
+
memoryReservationManager.reserveMemoryCumulatively(tvList.calculateRamSize());
+ }
+ // update current TVList owner to first query in the list
+ tvList.setOwnerQuery(firstQuery);
+ }
+ } finally {
+ tvList.unlockQueryList();
+ }
+ }
+
+ @Override
+ public abstract void putLong(long t, long v);
+
+ @Override
+ public abstract void putInt(long t, int v);
+
+ @Override
+ public abstract void putFloat(long t, float v);
+
+ @Override
+ public abstract void putDouble(long t, double v);
+
+ @Override
+ public abstract void putBinary(long t, Binary v);
+
+ @Override
+ public abstract void putBoolean(long t, boolean v);
+
+ @Override
+ public abstract void putAlignedRow(long t, Object[] v);
+
+ @Override
+ public abstract void putLongs(long[] t, long[] v, BitMap bitMap, int start,
int end);
+
+ @Override
+ public abstract void putInts(long[] t, int[] v, BitMap bitMap, int start,
int end);
+
+ @Override
+ public abstract void putFloats(long[] t, float[] v, BitMap bitMap, int
start, int end);
+
+ @Override
+ public abstract void putDoubles(long[] t, double[] v, BitMap bitMap, int
start, int end);
+
+ @Override
+ public abstract void putBinaries(long[] t, Binary[] v, BitMap bitMap, int
start, int end);
+
+ @Override
+ public abstract void putBooleans(long[] t, boolean[] v, BitMap bitMap, int
start, int end);
+
+ @Override
+ public abstract void putAlignedTablet(long[] t, Object[] v, BitMap[]
bitMaps, int start, int end);
+
+ @Override
+ public abstract void writeNonAlignedPoint(long insertTime, Object
objectValue);
+
+ @Override
+ public abstract void writeAlignedPoints(
+ long insertTime, Object[] objectValue, List<IMeasurementSchema>
schemaList);
+
+ @Override
+ public abstract void writeNonAlignedTablet(
+ long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int
start, int end);
+
+ @Override
+ public abstract void writeAlignedTablet(
+ long[] times,
+ Object[] valueList,
+ BitMap[] bitMaps,
+ List<IMeasurementSchema> schemaList,
+ int start,
+ int end);
+
+ @Override
+ public abstract long count();
+
+ @Override
+ public abstract long rowCount();
+
+ @Override
+ public abstract IMeasurementSchema getSchema();
+
+ @Override
+ public abstract void sortTvListForFlush();
+
+ @Override
+ public abstract int delete(long lowerBound, long upperBound);
+
+ @Override
+ public abstract IChunkWriter createIChunkWriter();
+
+ @Override
+ public abstract void encode(BlockingQueue<Object> ioTaskQueue);
+
+ @Override
+ public abstract void release();
+
+ @Override
+ public abstract boolean isEmpty();
+
+ @Override
+ public abstract List<? extends TVList> getSortedList();
+
+ @Override
+ public abstract TVList getWorkingTVList();
+
+ @Override
+ public abstract void setWorkingTVList(TVList list);
+
+ @Override
+ public abstract void serializeToWAL(IWALByteBufferView buffer);
+
+ @Override
+ public abstract int serializedSize();
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index 49be365336f..c8acb12768f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -21,9 +21,6 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
-import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
-import
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
@@ -60,10 +57,10 @@ import java.util.concurrent.BlockingQueue;
import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted;
-public class AlignedWritableMemChunk implements IWritableMemChunk {
+public class AlignedWritableMemChunk extends AbstractWritableMemChunk {
private final Map<String, Integer> measurementIndexMap;
- private final List<TSDataType> dataTypes;
+ private List<TSDataType> dataTypes;
private final List<IMeasurementSchema> schemaList;
private AlignedTVList list;
private List<AlignedTVList> sortedList;
@@ -190,33 +187,12 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
}
protected void handoverAlignedTvList() {
- // ensure query contexts won't be removed from list during handover
process.
- list.lockQueryList();
- try {
- if (list.isSorted()) {
- sortedList.add(list);
- } else if (list.getQueryContextList().isEmpty()) {
- list.sort();
- sortedList.add(list);
- } else {
- QueryContext firstQuery = list.getQueryContextList().get(0);
- // reserve query memory
- if (firstQuery instanceof FragmentInstanceContext) {
- MemoryReservationManager memoryReservationManager =
- ((FragmentInstanceContext)
firstQuery).getMemoryReservationContext();
-
memoryReservationManager.reserveMemoryCumulatively(list.calculateRamSize());
- }
- // update current TVList owner to first query in the list
- list.setOwnerQuery(firstQuery);
- // clone tv list
- AlignedTVList cloneList = list.clone();
- cloneList.sort();
- sortedList.add(cloneList);
- }
- } finally {
- list.unlockQueryList();
+ if (!list.isSorted()) {
+ list.sort();
}
- this.list = AlignedTVList.newAlignedList(dataTypes);
+ sortedList.add(list);
+ this.list = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes));
+ this.dataTypes = list.getTsDataTypes();
}
@Override
@@ -248,112 +224,6 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
}
}
- /**
- * Check metadata of columns and return array that mapping existed metadata
to index of data
- * column.
- *
- * @param schemaListInInsertPlan Contains all existed schema in InsertPlan.
If some timeseries
- * have been deleted, there will be null in its slot.
- * @return columnIndexArray: schemaList[i] is schema of
columns[columnIndexArray[i]]
- */
- private Pair<Object[], BitMap[]> checkAndReorderColumnValuesInInsertPlan(
- List<IMeasurementSchema> schemaListInInsertPlan, Object[] columnValues,
BitMap[] bitMaps) {
- Object[] reorderedColumnValues = new Object[schemaList.size()];
- BitMap[] reorderedBitMaps = bitMaps == null ? null : new
BitMap[schemaList.size()];
- for (int i = 0; i < schemaListInInsertPlan.size(); i++) {
- IMeasurementSchema measurementSchema = schemaListInInsertPlan.get(i);
- if (measurementSchema != null) {
- Integer index =
this.measurementIndexMap.get(measurementSchema.getMeasurementId());
- // Index is null means this measurement was not in this AlignedTVList
before.
- // We need to extend a new column in AlignedMemChunk and AlignedTVList.
- // And the reorderedColumnValues should extend one more column for the
new measurement
- if (index == null) {
- index =
- measurementIndexMap.isEmpty()
- ? 0
- : measurementIndexMap.values().stream()
- .mapToInt(Integer::intValue)
- .max()
- .getAsInt()
- + 1;
-
this.measurementIndexMap.put(schemaListInInsertPlan.get(i).getMeasurementId(),
index);
- this.schemaList.add(schemaListInInsertPlan.get(i));
- this.list.extendColumn(schemaListInInsertPlan.get(i).getType());
- reorderedColumnValues =
- Arrays.copyOf(reorderedColumnValues,
reorderedColumnValues.length + 1);
- if (reorderedBitMaps != null) {
- reorderedBitMaps = Arrays.copyOf(reorderedBitMaps,
reorderedBitMaps.length + 1);
- }
- }
- reorderedColumnValues[index] = columnValues[i];
- if (bitMaps != null) {
- reorderedBitMaps[index] = bitMaps[i];
- }
- }
- }
- return new Pair<>(reorderedColumnValues, reorderedBitMaps);
- }
-
- private void filterDeletedTimeStamp(
- AlignedTVList alignedTVList,
- List<List<TimeRange>> valueColumnsDeletionList,
- Map<Long, BitMap> timestampWithBitmap) {
- BitMap allValueColDeletedMap = alignedTVList.getAllValueColDeletedMap();
-
- int rowCount = alignedTVList.rowCount();
- List<int[]> valueColumnDeleteCursor = new ArrayList<>();
- if (valueColumnsDeletionList != null) {
- valueColumnsDeletionList.forEach(x -> valueColumnDeleteCursor.add(new
int[] {0}));
- }
-
- for (int row = 0; row < rowCount; row++) {
- // the row is deleted
- if (allValueColDeletedMap != null &&
allValueColDeletedMap.isMarked(row)) {
- continue;
- }
- long timestamp = alignedTVList.getTime(row);
-
- BitMap bitMap = new BitMap(schemaList.size());
- for (int column = 0; column < schemaList.size(); column++) {
- if (alignedTVList.isNullValue(alignedTVList.getValueIndex(row),
column)) {
- bitMap.mark(column);
- }
-
- // skip deleted row
- if (valueColumnsDeletionList != null
- && !valueColumnsDeletionList.isEmpty()
- && isPointDeleted(
- timestamp,
- valueColumnsDeletionList.get(column),
- valueColumnDeleteCursor.get(column))) {
- bitMap.mark(column);
- }
-
- // skip all-null row
- if (bitMap.isAllMarked()) {
- continue;
- }
- timestampWithBitmap.put(timestamp, bitMap);
- }
- }
- }
-
- public long[] getFilteredTimestamp(List<List<TimeRange>> deletionList,
List<BitMap> bitMaps) {
- Map<Long, BitMap> timestampWithBitmap = new TreeMap<>();
-
- filterDeletedTimeStamp(list, deletionList, timestampWithBitmap);
- for (AlignedTVList alignedTVList : sortedList) {
- filterDeletedTimeStamp(alignedTVList, deletionList, timestampWithBitmap);
- }
-
- List<Long> filteredTimestamps = new ArrayList<>();
- for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) {
- filteredTimestamps.add(entry.getKey());
- bitMaps.add(entry.getValue());
- }
- return filteredTimestamps.stream().mapToLong(Long::valueOf).toArray();
- }
-
@Override
public AlignedTVList getWorkingTVList() {
return list;
@@ -408,40 +278,6 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
return minTime;
}
- @Override
- public synchronized TVList getSortedTvListForQuery() {
- sortTVList();
- // increase reference count
- list.increaseReferenceCount();
- return list;
- }
-
- @Override
- public synchronized TVList getSortedTvListForQuery(List<IMeasurementSchema>
schemaList) {
- sortTVList();
- // increase reference count
- list.increaseReferenceCount();
- List<Integer> columnIndexList = new ArrayList<>();
- List<TSDataType> dataTypeList = new ArrayList<>();
- for (IMeasurementSchema measurementSchema : schemaList) {
- columnIndexList.add(
-
measurementIndexMap.getOrDefault(measurementSchema.getMeasurementId(), -1));
- dataTypeList.add(measurementSchema.getType());
- }
- return list.getTvListByColumnIndex(columnIndexList, dataTypeList);
- }
-
- private void sortTVList() {
- // check reference count
- if ((list.getReferenceCount() > 0 && !list.isSorted())) {
- list = list.clone();
- }
-
- if (!list.isSorted()) {
- list.sort();
- }
- }
-
@Override
public synchronized void sortTvListForFlush() {
if (!list.isSorted()) {
@@ -784,28 +620,6 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
}
}
- private void maybeReleaseTvList(AlignedTVList alignedTvList) {
- alignedTvList.lockQueryList();
- try {
- if (alignedTvList.getQueryContextList().isEmpty()) {
- alignedTvList.clear();
- } else {
- QueryContext firstQuery = alignedTvList.getQueryContextList().get(0);
- // transfer memory from write process to read process. Here it
reserves read memory and
- // releaseFlushedMemTable will release write memory.
- if (firstQuery instanceof FragmentInstanceContext) {
- MemoryReservationManager memoryReservationManager =
- ((FragmentInstanceContext)
firstQuery).getMemoryReservationContext();
-
memoryReservationManager.reserveMemoryCumulatively(alignedTvList.calculateRamSize());
- }
- // update current TVList owner to first query in the list
- alignedTvList.setOwnerQuery(firstQuery);
- }
- } finally {
- alignedTvList.unlockQueryList();
- }
- }
-
@Override
public void release() {
maybeReleaseTvList(list);
@@ -914,4 +728,103 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
}
return columnIndexList;
}
+
+ /**
+ * Check metadata of columns and return array that mapping existed metadata
to index of data
+ * column.
+ *
+ * @param schemaListInInsertPlan Contains all existed schema in InsertPlan.
If some timeseries
+ * have been deleted, there will be null in its slot.
+ * @return columnIndexArray: schemaList[i] is schema of
columns[columnIndexArray[i]]
+ */
+ private Pair<Object[], BitMap[]> checkAndReorderColumnValuesInInsertPlan(
+ List<IMeasurementSchema> schemaListInInsertPlan, Object[] columnValues,
BitMap[] bitMaps) {
+ Object[] reorderedColumnValues = new Object[schemaList.size()];
+ BitMap[] reorderedBitMaps = bitMaps == null ? null : new
BitMap[schemaList.size()];
+ for (int i = 0; i < schemaListInInsertPlan.size(); i++) {
+ IMeasurementSchema measurementSchema = schemaListInInsertPlan.get(i);
+ if (measurementSchema != null) {
+ Integer index =
this.measurementIndexMap.get(measurementSchema.getMeasurementId());
+ // Index is null means this measurement was not in this AlignedTVList
before.
+ // We need to extend a new column in AlignedMemChunk and AlignedTVList.
+ // And the reorderedColumnValues should extend one more column for the
new measurement
+ if (index == null) {
+ index = this.list.getTsDataTypes().size();
+
this.measurementIndexMap.put(schemaListInInsertPlan.get(i).getMeasurementId(),
index);
+ this.schemaList.add(schemaListInInsertPlan.get(i));
+ this.list.extendColumn(schemaListInInsertPlan.get(i).getType());
+ reorderedColumnValues =
+ Arrays.copyOf(reorderedColumnValues,
reorderedColumnValues.length + 1);
+ if (reorderedBitMaps != null) {
+ reorderedBitMaps = Arrays.copyOf(reorderedBitMaps,
reorderedBitMaps.length + 1);
+ }
+ }
+ reorderedColumnValues[index] = columnValues[i];
+ if (bitMaps != null) {
+ reorderedBitMaps[index] = bitMaps[i];
+ }
+ }
+ }
+ return new Pair<>(reorderedColumnValues, reorderedBitMaps);
+ }
+
+ private void filterDeletedTimeStamp(
+ AlignedTVList alignedTVList,
+ List<List<TimeRange>> valueColumnsDeletionList,
+ Map<Long, BitMap> timestampWithBitmap) {
+ BitMap allValueColDeletedMap = alignedTVList.getAllValueColDeletedMap();
+
+ int rowCount = alignedTVList.rowCount();
+ List<int[]> valueColumnDeleteCursor = new ArrayList<>();
+ if (valueColumnsDeletionList != null) {
+ valueColumnsDeletionList.forEach(x -> valueColumnDeleteCursor.add(new
int[] {0}));
+ }
+
+ for (int row = 0; row < rowCount; row++) {
+ // the row is deleted
+ if (allValueColDeletedMap != null &&
allValueColDeletedMap.isMarked(row)) {
+ continue;
+ }
+ long timestamp = alignedTVList.getTime(row);
+
+ BitMap bitMap = new BitMap(schemaList.size());
+ for (int column = 0; column < schemaList.size(); column++) {
+ if (alignedTVList.isNullValue(alignedTVList.getValueIndex(row),
column)) {
+ bitMap.mark(column);
+ }
+
+ // skip deleted row
+ if (valueColumnsDeletionList != null
+ && !valueColumnsDeletionList.isEmpty()
+ && isPointDeleted(
+ timestamp,
+ valueColumnsDeletionList.get(column),
+ valueColumnDeleteCursor.get(column))) {
+ bitMap.mark(column);
+ }
+
+ // skip all-null row
+ if (bitMap.isAllMarked()) {
+ continue;
+ }
+ timestampWithBitmap.put(timestamp, bitMap);
+ }
+ }
+ }
+
+ public long[] getFilteredTimestamp(List<List<TimeRange>> deletionList,
List<BitMap> bitMaps) {
+ Map<Long, BitMap> timestampWithBitmap = new TreeMap<>();
+
+ filterDeletedTimeStamp(list, deletionList, timestampWithBitmap);
+ for (AlignedTVList alignedTVList : sortedList) {
+ filterDeletedTimeStamp(alignedTVList, deletionList, timestampWithBitmap);
+ }
+
+ List<Long> filteredTimestamps = new ArrayList<>();
+ for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) {
+ filteredTimestamps.add(entry.getKey());
+ bitMaps.add(entry.getValue());
+ }
+ return filteredTimestamps.stream().mapToLong(Long::valueOf).toArray();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index 21e715cbd50..25ed5214e51 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -85,35 +85,6 @@ public interface IWritableMemChunk extends WALEntryValue {
IMeasurementSchema getSchema();
- /**
- * served for read requests.
- *
- * <p>if tv list has been sorted, just return reference of it
- *
- * <p>if tv list hasn't been sorted and has no reference, sort and return
reference of it
- *
- * <p>if tv list hasn't been sorted and has reference we should copy and
sort it, then return ths
- * list
- *
- * <p>the mechanism is just like copy on write
- *
- * <p>This interface should be synchronized for concurrent with
sortTvListForFlush
- *
- * @return sorted tv list
- */
- TVList getSortedTvListForQuery();
-
- /**
- * served for vector read requests.
- *
- * <p>the mechanism is just like copy on write
- *
- * <p>This interface should be synchronized for concurrent with
sortTvListForFlush
- *
- * @return sorted tv list
- */
- TVList getSortedTvListForQuery(List<IMeasurementSchema> schemaList);
-
/**
* served for flush requests. The logic is just same as
getSortedTVListForQuery, but without add
* reference count
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 3169c878486..99c0157b790 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -20,9 +20,6 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
-import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
-import
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.utils.ModificationUtils;
import org.apache.iotdb.db.utils.datastructure.MemPointIterator;
@@ -53,7 +50,7 @@ import java.util.stream.Collectors;
import static org.apache.iotdb.db.utils.MemUtils.getBinarySize;
-public class WritableMemChunk implements IWritableMemChunk {
+public class WritableMemChunk extends AbstractWritableMemChunk {
private IMeasurementSchema schema;
private TVList list;
@@ -76,46 +73,10 @@ public class WritableMemChunk implements IWritableMemChunk {
private WritableMemChunk() {}
protected void handoverTvList() {
- // ensure query contexts won't be removed from list during handover
process.
- list.lockQueryList();
- try {
- if (list.isSorted()) {
- sortedList.add(list);
- } else if (list.getQueryContextList().isEmpty()) {
- list.sort();
- sortedList.add(list);
- } else {
- /*
- * +----------------------+
- * | MemTable |
- * | |
- * | +---------------+ | +----------+
- * | | sorted TVList | | +---+ Query |
- * | +------^--------+ | | +----------+
- * | | | |
- * +----------+-----------+ |
- * | Clone + Sort |
- * +-----+------+ |
- * | TVList | <---------+
- * +------------+
- */
- QueryContext firstQuery = list.getQueryContextList().get(0);
- // reserve query memory
- if (firstQuery instanceof FragmentInstanceContext) {
- MemoryReservationManager memoryReservationManager =
- ((FragmentInstanceContext)
firstQuery).getMemoryReservationContext();
-
memoryReservationManager.reserveMemoryCumulatively(list.calculateRamSize());
- }
- // update current TVList owner to first query in the list
- list.setOwnerQuery(firstQuery);
- // clone tv list
- TVList cloneList = list.clone();
- cloneList.sort();
- sortedList.add(cloneList);
- }
- } finally {
- list.unlockQueryList();
+ if (!list.isSorted()) {
+ list.sort();
}
+ sortedList.add(list);
this.list = TVList.newList(schema.getType());
}
@@ -279,30 +240,6 @@ public class WritableMemChunk implements IWritableMemChunk
{
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
schema.getType());
}
- @Override
- public synchronized TVList getSortedTvListForQuery() {
- sortTVList();
- // increase reference count
- list.increaseReferenceCount();
- return list;
- }
-
- @Override
- public synchronized TVList getSortedTvListForQuery(List<IMeasurementSchema>
measurementSchema) {
- throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
list.getDataType());
- }
-
- private void sortTVList() {
- // check reference count
- if ((list.getReferenceCount() > 0 && !list.isSorted())) {
- list = list.clone();
- }
-
- if (!list.isSorted()) {
- list.sort();
- }
- }
-
@Override
public synchronized void sortTvListForFlush() {
if (!list.isSorted()) {
@@ -310,43 +247,6 @@ public class WritableMemChunk implements IWritableMemChunk
{
}
}
- private void filterDeletedTimestamp(
- TVList tvlist, List<TimeRange> deletionList, List<Long> timestampList) {
- long lastTime = Long.MIN_VALUE;
- int[] deletionCursor = {0};
- int rowCount = tvlist.rowCount();
- for (int i = 0; i < rowCount; i++) {
- if (tvlist.getBitMap() != null &&
tvlist.isNullValue(tvlist.getValueIndex(i))) {
- continue;
- }
- long curTime = tvlist.getTime(i);
- if (deletionList != null
- && ModificationUtils.isPointDeleted(curTime, deletionList,
deletionCursor)) {
- continue;
- }
-
- if (i == rowCount - 1 || curTime != lastTime) {
- timestampList.add(curTime);
- }
- lastTime = curTime;
- }
- }
-
- public long[] getFilteredTimestamp(List<TimeRange> deletionList) {
- List<Long> timestampList = new ArrayList<>();
- filterDeletedTimestamp(list, deletionList, timestampList);
- for (TVList tvList : sortedList) {
- filterDeletedTimestamp(tvList, deletionList, timestampList);
- }
-
- // remove duplicated time
- List<Long> distinctTimestamps =
timestampList.stream().distinct().collect(Collectors.toList());
- // sort timestamps
- long[] filteredTimestamps =
distinctTimestamps.stream().mapToLong(Long::longValue).toArray();
- Arrays.sort(filteredTimestamps);
- return filteredTimestamps;
- }
-
@Override
public TVList getWorkingTVList() {
return list;
@@ -646,34 +546,6 @@ public class WritableMemChunk implements IWritableMemChunk
{
}
}
- /**
- * Release process for memtable flush. Release the TVList if there is no
query on it, otherwise
- * set query owner and release the TVList until query finishes.
- *
- * @param tvList
- */
- private void maybeReleaseTvList(TVList tvList) {
- tvList.lockQueryList();
- try {
- if (tvList.getQueryContextList().isEmpty()) {
- tvList.clear();
- } else {
- QueryContext firstQuery = tvList.getQueryContextList().get(0);
- // transfer memory from write process to read process. Here it
reserves read memory and
- // releaseFlushedMemTable will release write memory.
- if (firstQuery instanceof FragmentInstanceContext) {
- MemoryReservationManager memoryReservationManager =
- ((FragmentInstanceContext)
firstQuery).getMemoryReservationContext();
-
memoryReservationManager.reserveMemoryCumulatively(tvList.calculateRamSize());
- }
- // update current TVList owner to first query in the list
- tvList.setOwnerQuery(firstQuery);
- }
- } finally {
- tvList.unlockQueryList();
- }
- }
-
@Override
public void release() {
maybeReleaseTvList(list);
@@ -729,4 +601,41 @@ public class WritableMemChunk implements IWritableMemChunk
{
public List<TVList> getSortedList() {
return sortedList;
}
+
+ private void filterDeletedTimestamp(
+ TVList tvlist, List<TimeRange> deletionList, List<Long> timestampList) {
+ long lastTime = Long.MIN_VALUE;
+ int[] deletionCursor = {0};
+ int rowCount = tvlist.rowCount();
+ for (int i = 0; i < rowCount; i++) {
+ if (tvlist.getBitMap() != null &&
tvlist.isNullValue(tvlist.getValueIndex(i))) {
+ continue;
+ }
+ long curTime = tvlist.getTime(i);
+ if (deletionList != null
+ && ModificationUtils.isPointDeleted(curTime, deletionList,
deletionCursor)) {
+ continue;
+ }
+
+ if (i == rowCount - 1 || curTime != lastTime) {
+ timestampList.add(curTime);
+ }
+ lastTime = curTime;
+ }
+ }
+
+ public long[] getFilteredTimestamp(List<TimeRange> deletionList) {
+ List<Long> timestampList = new ArrayList<>();
+ filterDeletedTimestamp(list, deletionList, timestampList);
+ for (TVList tvList : sortedList) {
+ filterDeletedTimestamp(tvList, deletionList, timestampList);
+ }
+
+ // remove duplicated time
+ List<Long> distinctTimestamps =
timestampList.stream().distinct().collect(Collectors.toList());
+ // sort timestamps
+ long[] filteredTimestamps =
distinctTimestamps.stream().mapToLong(Long::longValue).toArray();
+ Arrays.sort(filteredTimestamps);
+ return filteredTimestamps;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index f379f236290..95b5da1be24 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -121,7 +121,7 @@ public abstract class AlignedTVList extends TVList {
}
}
}
- AlignedTVList alignedTvList = AlignedTVList.newAlignedList(dataTypeList);
+ AlignedTVList alignedTvList = AlignedTVList.newAlignedList(new
ArrayList<>(dataTypeList));
alignedTvList.timestamps = this.timestamps;
alignedTvList.indices = this.indices;
alignedTvList.values = values;
@@ -132,8 +132,8 @@ public abstract class AlignedTVList extends TVList {
}
@Override
- public AlignedTVList clone() {
- AlignedTVList cloneList = AlignedTVList.newAlignedList(dataTypes);
+ public synchronized AlignedTVList clone() {
+ AlignedTVList cloneList = AlignedTVList.newAlignedList(new
ArrayList<>(dataTypes));
cloneAs(cloneList);
System.arraycopy(
memoryBinaryChunkSize, 0, cloneList.memoryBinaryChunkSize, 0,
dataTypes.size());
@@ -330,10 +330,11 @@ public abstract class AlignedTVList extends TVList {
public void extendColumn(TSDataType dataType) {
if (bitMaps == null) {
- bitMaps = new ArrayList<>(values.size());
+ List<List<BitMap>> localBitMaps = new ArrayList<>(values.size());
for (int i = 0; i < values.size(); i++) {
- bitMaps.add(null);
+ localBitMaps.add(null);
}
+ bitMaps = localBitMaps;
}
List<Object> columnValue = new ArrayList<>();
List<BitMap> columnBitMaps = new ArrayList<>();
@@ -555,10 +556,11 @@ public abstract class AlignedTVList extends TVList {
public void deleteColumn(int columnIndex) {
if (bitMaps == null) {
- bitMaps = new ArrayList<>(dataTypes.size());
+ List<List<BitMap>> localBitMaps = new ArrayList<>(dataTypes.size());
for (int j = 0; j < dataTypes.size(); j++) {
- bitMaps.add(null);
+ localBitMaps.add(null);
}
+ bitMaps = localBitMaps;
}
if (bitMaps.get(columnIndex) == null) {
List<BitMap> columnBitMaps = new ArrayList<>();
@@ -568,6 +570,9 @@ public abstract class AlignedTVList extends TVList {
bitMaps.set(columnIndex, columnBitMaps);
}
for (int i = 0; i < bitMaps.get(columnIndex).size(); i++) {
+ if (bitMaps.get(columnIndex).get(i) == null) {
+ bitMaps.get(columnIndex).set(i, new BitMap(ARRAY_SIZE));
+ }
bitMaps.get(columnIndex).get(i).markAll();
}
}
@@ -783,10 +788,11 @@ public abstract class AlignedTVList extends TVList {
private void markNullValue(int columnIndex, int arrayIndex, int
elementIndex) {
// init BitMaps if doesn't have
if (bitMaps == null) {
- bitMaps = new ArrayList<>(dataTypes.size());
+ List<List<BitMap>> localBitMaps = new ArrayList<>(dataTypes.size());
for (int i = 0; i < dataTypes.size(); i++) {
- bitMaps.add(null);
+ localBitMaps.add(null);
}
+ bitMaps = localBitMaps;
}
// if the bitmap in columnIndex is null, init the bitmap of this column
from the beginning
@@ -1228,7 +1234,7 @@ public abstract class AlignedTVList extends TVList {
bitMaps[columnIndex] = bitMap;
}
- AlignedTVList tvList = AlignedTVList.newAlignedList(dataTypes);
+ AlignedTVList tvList = AlignedTVList.newAlignedList(new
ArrayList<>(dataTypes));
tvList.putAlignedValues(times, values, bitMaps, 0, rowCount);
return tvList;
}
@@ -1335,8 +1341,8 @@ public abstract class AlignedTVList extends TVList {
private final BitMap allValueColDeletedMap;
private final List<TSDataType> dataTypeList;
private final List<Integer> columnIndexList;
- List<List<TimeRange>> valueColumnsDeletionList;
- private final Integer floatPrecision;
+ private final List<List<TimeRange>> valueColumnsDeletionList;
+ private final int floatPrecision;
private final List<TSEncoding> encodingList;
// remember the selected index of last not-null value for each column
during prepareNext phase
@@ -1362,7 +1368,7 @@ public abstract class AlignedTVList extends TVList {
: columnIndexList;
this.allValueColDeletedMap = getAllValueColDeletedMap();
this.valueColumnsDeletionList = valueColumnsDeletionList;
- this.floatPrecision = floatPrecision;
+ this.floatPrecision = floatPrecision != null ? floatPrecision : 0;
this.encodingList = encodingList;
this.selectedIndices = new int[dataTypeList.size()];
for (int i = 0; i < dataTypeList.size(); i++) {
@@ -1459,7 +1465,7 @@ public abstract class AlignedTVList extends TVList {
}
public TsPrimitiveType getPrimitiveTypeObject(int rowIndex, int
columnIndex) {
- int valueIndex = getValueIndex(index);
+ int valueIndex = getValueIndex(rowIndex);
if (valueIndex < 0 || valueIndex >= rows) {
return null;
}
@@ -1483,19 +1489,19 @@ public abstract class AlignedTVList extends TVList {
return TsPrimitiveType.getByType(
TSDataType.INT64, getLongByValueIndex(valueIndex,
validColumnIndex));
case FLOAT:
- return TsPrimitiveType.getByType(
- TSDataType.FLOAT,
- roundValueWithGivenPrecision(
- getFloatByValueIndex(valueIndex, validColumnIndex),
- floatPrecision,
- encodingList.get(columnIndex)));
+ float valueF = getFloatByValueIndex(valueIndex, validColumnIndex);
+ if (encodingList != null) {
+ valueF =
+ roundValueWithGivenPrecision(valueF, floatPrecision,
encodingList.get(columnIndex));
+ }
+ return TsPrimitiveType.getByType(TSDataType.FLOAT, valueF);
case DOUBLE:
- return TsPrimitiveType.getByType(
- TSDataType.DOUBLE,
- roundValueWithGivenPrecision(
- getDoubleByValueIndex(valueIndex, validColumnIndex),
- floatPrecision,
- encodingList.get(columnIndex)));
+ double valueD = getDoubleByValueIndex(valueIndex, validColumnIndex);
+ if (encodingList != null) {
+ valueD =
+ roundValueWithGivenPrecision(valueD, floatPrecision,
encodingList.get(columnIndex));
+ }
+ return TsPrimitiveType.getByType(TSDataType.DOUBLE, valueD);
case TEXT:
case BLOB:
case STRING:
@@ -1630,18 +1636,22 @@ public abstract class AlignedTVList extends TVList {
valueBuilder.writeLong(getLongByValueIndex(originRowIndex,
validColumnIndex));
break;
case FLOAT:
- valueBuilder.writeFloat(
- roundValueWithGivenPrecision(
- getFloatByValueIndex(originRowIndex, validColumnIndex),
- floatPrecision,
- encodingList.get(columnIndex)));
+ float valueF = getFloatByValueIndex(originRowIndex,
validColumnIndex);
+ if (encodingList != null) {
+ valueF =
+ roundValueWithGivenPrecision(
+ valueF, floatPrecision, encodingList.get(columnIndex));
+ }
+ valueBuilder.writeFloat(valueF);
break;
case DOUBLE:
- valueBuilder.writeDouble(
- roundValueWithGivenPrecision(
- getDoubleByValueIndex(originRowIndex, validColumnIndex),
- floatPrecision,
- encodingList.get(columnIndex)));
+ double valueD = getDoubleByValueIndex(originRowIndex,
validColumnIndex);
+ if (encodingList != null) {
+ valueD =
+ roundValueWithGivenPrecision(
+ valueD, floatPrecision, encodingList.get(columnIndex));
+ }
+ valueBuilder.writeDouble(valueD);
break;
case TEXT:
case BLOB:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
index fd26eded1ac..dc4ff5529d4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
@@ -64,7 +64,7 @@ public abstract class BinaryTVList extends TVList {
}
@Override
- public BinaryTVList clone() {
+ public synchronized BinaryTVList clone() {
BinaryTVList cloneList = BinaryTVList.newList();
cloneAs(cloneList);
cloneBitMap(cloneList);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
index 2a84c13f546..b8eb0e508bf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
@@ -63,7 +63,7 @@ public abstract class BooleanTVList extends TVList {
}
@Override
- public BooleanTVList clone() {
+ public synchronized BooleanTVList clone() {
BooleanTVList cloneList = BooleanTVList.newList();
cloneAs(cloneList);
cloneBitMap(cloneList);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
index 5f82527ca76..f61995ef062 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
@@ -64,7 +64,7 @@ public abstract class DoubleTVList extends TVList {
}
@Override
- public DoubleTVList clone() {
+ public synchronized DoubleTVList clone() {
DoubleTVList cloneList = DoubleTVList.newList();
cloneAs(cloneList);
cloneBitMap(cloneList);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
index de79955a2e7..3623fa49a3e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
@@ -64,7 +64,7 @@ public abstract class FloatTVList extends TVList {
}
@Override
- public FloatTVList clone() {
+ public synchronized FloatTVList clone() {
FloatTVList cloneList = FloatTVList.newList();
cloneAs(cloneList);
cloneBitMap(cloneList);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
index f3312ebd9c2..758cd64053b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
@@ -63,7 +63,7 @@ public abstract class IntTVList extends TVList {
}
@Override
- public IntTVList clone() {
+ public synchronized IntTVList clone() {
IntTVList cloneList = IntTVList.newList();
cloneAs(cloneList);
cloneBitMap(cloneList);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
index bea59266fcd..7b4bd8d82d2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
@@ -63,7 +63,7 @@ public abstract class LongTVList extends TVList {
}
@Override
- public LongTVList clone() {
+ public synchronized LongTVList clone() {
LongTVList cloneList = LongTVList.newList();
cloneAs(cloneList);
cloneBitMap(cloneList);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java
index e2ee1af4aca..5ef966f2d27 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java
@@ -141,8 +141,9 @@ public abstract class MultiAlignedTVListIterator implements
MemPointIterator {
continue;
}
+ int valueIndex =
alignedTVList.getValueIndex(currentRowIndex(columnIndex));
// null value
- if (alignedTVList.isNullValue(currentRowIndex(columnIndex),
validColumnIndex)) {
+ if (alignedTVList.isNullValue(valueIndex, validColumnIndex)) {
valueBuilder.appendNull();
continue;
}
@@ -150,22 +151,18 @@ public abstract class MultiAlignedTVListIterator
implements MemPointIterator {
switch (tsDataTypeList.get(columnIndex)) {
case BOOLEAN:
valueBuilder.writeBoolean(
- alignedTVList.getBooleanByValueIndex(
- currentRowIndex(columnIndex), validColumnIndex));
+ alignedTVList.getBooleanByValueIndex(valueIndex,
validColumnIndex));
break;
case INT32:
case DATE:
- valueBuilder.writeInt(
- alignedTVList.getIntByValueIndex(currentRowIndex(columnIndex),
validColumnIndex));
+ valueBuilder.writeInt(alignedTVList.getIntByValueIndex(valueIndex,
validColumnIndex));
break;
case INT64:
case TIMESTAMP:
- valueBuilder.writeLong(
-
alignedTVList.getLongByValueIndex(currentRowIndex(columnIndex),
validColumnIndex));
+
valueBuilder.writeLong(alignedTVList.getLongByValueIndex(valueIndex,
validColumnIndex));
break;
case FLOAT:
- float valueF =
-
alignedTVList.getFloatByValueIndex(currentRowIndex(columnIndex),
validColumnIndex);
+ float valueF = alignedTVList.getFloatByValueIndex(valueIndex,
validColumnIndex);
if (encodingList != null) {
valueF =
alignedTVList.roundValueWithGivenPrecision(
@@ -174,8 +171,7 @@ public abstract class MultiAlignedTVListIterator implements
MemPointIterator {
valueBuilder.writeFloat(valueF);
break;
case DOUBLE:
- double valueD =
-
alignedTVList.getDoubleByValueIndex(currentRowIndex(columnIndex),
validColumnIndex);
+ double valueD = alignedTVList.getDoubleByValueIndex(valueIndex,
validColumnIndex);
if (encodingList != null) {
valueD =
alignedTVList.roundValueWithGivenPrecision(
@@ -187,8 +183,7 @@ public abstract class MultiAlignedTVListIterator implements
MemPointIterator {
case BLOB:
case STRING:
valueBuilder.writeBinary(
- alignedTVList.getBinaryByValueIndex(
- currentRowIndex(columnIndex), validColumnIndex));
+ alignedTVList.getBinaryByValueIndex(valueIndex,
validColumnIndex));
break;
default:
throw new UnSupportedDataTypeException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 6c88c1526f1..2a3e26f99c2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -41,7 +41,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
@@ -70,8 +72,8 @@ public abstract class TVList implements WALEntryValue {
// lock to provide synchronization for query list
private final ReentrantLock queryListLock = new ReentrantLock();
- // list of query that this TVList is used
- protected final List<QueryContext> queryContextList;
+ // set of query that this TVList is used
+ protected final Set<QueryContext> queryContextSet;
// the owner query which is obligated to release the TVList.
// When it is null, the TVList is owned by insert thread and released after
flush.
@@ -93,7 +95,7 @@ public abstract class TVList implements WALEntryValue {
seqRowCount = 0;
maxTime = Long.MIN_VALUE;
minTime = Long.MAX_VALUE;
- queryContextList = new ArrayList<>();
+ queryContextSet = new HashSet<>();
referenceCount = new AtomicInteger();
}
@@ -242,10 +244,11 @@ public abstract class TVList implements WALEntryValue {
protected void markNullValue(int arrayIndex, int elementIndex) {
// init bitMap if doesn't have
if (bitMap == null) {
- bitMap = new ArrayList<>();
+ List<BitMap> localBitMap = new ArrayList<>();
for (int i = 0; i < timestamps.size(); i++) {
- bitMap.add(new BitMap(ARRAY_SIZE));
+ localBitMap.add(new BitMap(ARRAY_SIZE));
}
+ bitMap = localBitMap;
}
// if the bitmap in arrayIndex is null, init the bitmap
if (bitMap.get(arrayIndex) == null) {
@@ -417,7 +420,7 @@ public abstract class TVList implements WALEntryValue {
}
// common clone for both TVList and AlignedTVList
- protected synchronized void cloneAs(TVList cloneList) {
+ protected void cloneAs(TVList cloneList) {
// clone timestamps
for (long[] timestampArray : timestamps) {
cloneList.timestamps.add(cloneTime(timestampArray));
@@ -442,7 +445,7 @@ public abstract class TVList implements WALEntryValue {
sorted = true;
maxTime = Long.MIN_VALUE;
minTime = Long.MAX_VALUE;
- queryContextList.clear();
+ queryContextSet.clear();
ownerQuery = null;
clearTime();
clearValue();
@@ -620,8 +623,8 @@ public abstract class TVList implements WALEntryValue {
return ownerQuery;
}
- public List<QueryContext> getQueryContextList() {
- return queryContextList;
+ public Set<QueryContext> getQueryContextSet() {
+ return queryContextSet;
}
public List<BitMap> getBitMap() {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
index 12498238f7d..572e6cd7ef8 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
@@ -18,16 +18,33 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.memtable;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException;
+import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
+import org.apache.iotdb.db.queryengine.execution.driver.IDriver;
+import
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager;
+import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceExecution;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
+import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
+import
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALByteBufferForTest;
@@ -50,6 +67,7 @@ import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -60,10 +78,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.junit.Assert.assertEquals;
public class PrimitiveMemTableTest {
+ private static final IoTDBConfig conf =
IoTDBDescriptor.getInstance().getConfig();
+ private static final int dataNodeId = 0;
String database = "root.test";
String dataRegionId = "1";
@@ -72,6 +94,7 @@ public class PrimitiveMemTableTest {
@Before
public void setUp() {
delta = Math.pow(0.1,
TSFileDescriptor.getInstance().getConfig().getFloatPrecision());
+ conf.setDataNodeId(dataNodeId);
}
@Test
@@ -583,4 +606,64 @@ public class PrimitiveMemTableTest {
memTable.serializeToWAL(walBuffer);
assertEquals(0, walBuffer.getBuffer().remaining());
}
+
+ @Test
+ public void testReleaseWithNotEnoughMemory() throws CpuNotEnoughException {
+ TSDataType dataType = TSDataType.INT32;
+ WritableMemChunk series =
+ new WritableMemChunk(new MeasurementSchema("s1", dataType,
TSEncoding.PLAIN));
+ int count = 100;
+ for (int i = 0; i < count; i++) {
+ series.writeNonAlignedPoint(i, i);
+ }
+
+ // mock MemoryNotEnoughException exception
+ TVList list = series.getWorkingTVList();
+
+ // mock MemoryReservationManager
+ MemoryReservationManager memoryReservationManager =
+ Mockito.mock(MemoryReservationManager.class);
+ Mockito.doThrow(new MemoryNotEnoughException(""))
+ .when(memoryReservationManager)
+ .reserveMemoryCumulatively(list.calculateRamSize());
+
+ // create FragmentInstanceId
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext queryContext =
+ createFragmentInstanceContext(instanceId, stateMachine,
memoryReservationManager);
+ queryContext.initializeNumOfDrivers(1);
+ DataRegion dataRegion = Mockito.mock(DataRegion.class);
+ queryContext.setDataRegion(dataRegion);
+
+ list.getQueryContextSet().add(queryContext);
+ Map<TVList, Integer> tvlistMap = new HashMap<>();
+ tvlistMap.put(list, 100);
+ queryContext.addTVListToSet(tvlistMap);
+
+ // fragment instance execution
+ IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
+ List<IDriver> drivers = Collections.emptyList();
+ ISink sinkHandle = Mockito.mock(ISink.class);
+ MPPDataExchangeManager exchangeManager =
Mockito.mock(MPPDataExchangeManager.class);
+ FragmentInstanceExecution execution =
+ FragmentInstanceExecution.createFragmentInstanceExecution(
+ scheduler,
+ instanceId,
+ queryContext,
+ drivers,
+ sinkHandle,
+ stateMachine,
+ -1,
+ false,
+ exchangeManager);
+
+ queryContext.decrementNumOfUnClosedDriver();
+ series.release();
+ }
}