This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 5e66c15c14b [To dev/1.3] fix: MemoryNotEnough exception & clone & 
delete issues (#15170)
5e66c15c14b is described below

commit 5e66c15c14b2c8a385a9ce8e56f7be3bf6e4b1fa
Author: shizy <[email protected]>
AuthorDate: Thu Mar 27 18:42:26 2025 +0800

    [To dev/1.3] fix: MemoryNotEnough exception & clone & delete issues (#15170)
    
    * fix: MemoryNotEnough exception when flushing try to release tvlist
    * retry release tvlist if memory not enough during flush
    * synchronized clone method for TVList
    * query context list -> query context set
    
    * fix: remove query context when abort finish
    
    * refactor: reorganize AbstractWritableMemChunk
    
    * fix: float/double value
    
    * fix: some testcase bugs
    
    * fix: delete column bug for aligned timeseries
    
    * fix: clone data types list when create new AlignedTVList
    
    * fix: bitmap is null when delete column
    
    * init bitmaps when it is null
    
    * format
    
    * fix: error from cherry-pick
    
    * set memchunk datatypes when handover
---
 .../it/schema/IoTDBDeleteAlignedTimeseriesIT.java  |  42 +++
 .../fragment/FragmentInstanceContext.java          |  42 ++-
 .../schemaregion/utils/ResourceByPathUtils.java    |  12 +-
 .../memtable/AbstractWritableMemChunk.java         | 214 +++++++++++++++
 .../memtable/AlignedWritableMemChunk.java          | 299 ++++++++-------------
 .../dataregion/memtable/IWritableMemChunk.java     |  29 --
 .../dataregion/memtable/WritableMemChunk.java      | 173 +++---------
 .../db/utils/datastructure/AlignedTVList.java      |  82 +++---
 .../iotdb/db/utils/datastructure/BinaryTVList.java |   2 +-
 .../db/utils/datastructure/BooleanTVList.java      |   2 +-
 .../iotdb/db/utils/datastructure/DoubleTVList.java |   2 +-
 .../iotdb/db/utils/datastructure/FloatTVList.java  |   2 +-
 .../iotdb/db/utils/datastructure/IntTVList.java    |   2 +-
 .../iotdb/db/utils/datastructure/LongTVList.java   |   2 +-
 .../datastructure/MultiAlignedTVListIterator.java  |  21 +-
 .../iotdb/db/utils/datastructure/TVList.java       |  21 +-
 .../dataregion/memtable/PrimitiveMemTableTest.java |  83 ++++++
 17 files changed, 601 insertions(+), 429 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBDeleteAlignedTimeseriesIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBDeleteAlignedTimeseriesIT.java
index 3defa886f8d..79198e97429 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBDeleteAlignedTimeseriesIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBDeleteAlignedTimeseriesIT.java
@@ -246,4 +246,46 @@ public class IoTDBDeleteAlignedTimeseriesIT extends 
AbstractSchemaIT {
       fail(e.getMessage());
     }
   }
+
+  @Test
+  public void deleteTimeseriesAndCreateSameTypeTest2() throws Exception {
+    String[] retArray = new String[] {"1,4.0,", "2,8.0,"};
+    int cnt = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute(
+          "create aligned timeseries root.turbine1.d1(s1 FLOAT encoding=PLAIN 
compression=SNAPPY, "
+              + "s2 INT64 encoding=PLAIN compression=SNAPPY, s4 DOUBLE 
encoding=PLAIN compression=SNAPPY)");
+      statement.execute("INSERT INTO root.turbine1.d1(timestamp,s1,s2,s4) 
ALIGNED VALUES(1,1,2,4)");
+
+      try (ResultSet resultSet = statement.executeQuery("SELECT s4 FROM 
root.turbine1.d1")) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        while (resultSet.next()) {
+          StringBuilder builder = new StringBuilder();
+          for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+            builder.append(resultSet.getString(i)).append(",");
+          }
+          Assert.assertEquals(retArray[cnt], builder.toString());
+          cnt++;
+        }
+      }
+      // delete series in the middle
+      statement.execute("DELETE timeseries root.turbine1.d1.s4");
+      statement.execute(
+          "INSERT INTO root.turbine1.d1(timestamp,s3,s4) ALIGNED 
VALUES(2,false,8.0)");
+      statement.execute("FLUSH");
+
+      try (ResultSet resultSet = statement.executeQuery("SELECT s4 FROM 
root.turbine1.d1")) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        while (resultSet.next()) {
+          StringBuilder builder = new StringBuilder();
+          for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+            builder.append(resultSet.getString(i)).append(",");
+          }
+          Assert.assertEquals(retArray[cnt], builder.toString());
+          cnt++;
+        }
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 58f06bfb270..7e2c144b0fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -186,6 +186,22 @@ public class FragmentInstanceContext extends QueryContext {
     return instanceContext;
   }
 
+  @TestOnly
+  public static FragmentInstanceContext createFragmentInstanceContext(
+      FragmentInstanceId id,
+      FragmentInstanceStateMachine stateMachine,
+      MemoryReservationManager memoryReservationManager) {
+    FragmentInstanceContext instanceContext =
+        new FragmentInstanceContext(
+            id,
+            stateMachine,
+            new SessionInfo(1, "test", ZoneId.systemDefault()),
+            memoryReservationManager);
+    instanceContext.initialize();
+    instanceContext.start();
+    return instanceContext;
+  }
+
   private FragmentInstanceContext(
       FragmentInstanceId id,
       FragmentInstanceStateMachine stateMachine,
@@ -218,6 +234,20 @@ public class FragmentInstanceContext extends QueryContext {
         new ThreadSafeMemoryReservationManager(id.getQueryId(), 
this.getClass().getName());
   }
 
+  private FragmentInstanceContext(
+      FragmentInstanceId id,
+      FragmentInstanceStateMachine stateMachine,
+      SessionInfo sessionInfo,
+      MemoryReservationManager memoryReservationManager) {
+    this.id = id;
+    this.stateMachine = stateMachine;
+    this.executionEndTime.set(END_TIME_INITIAL_VALUE);
+    this.sessionInfo = sessionInfo;
+    this.dataNodeQueryContextMap = null;
+    this.dataNodeQueryContext = null;
+    this.memoryReservationManager = memoryReservationManager;
+  }
+
   private FragmentInstanceContext(
       FragmentInstanceId id,
       FragmentInstanceStateMachine stateMachine,
@@ -651,11 +681,11 @@ public class FragmentInstanceContext extends QueryContext 
{
   private void releaseTVListOwnedByQuery() {
     for (TVList tvList : tvListSet) {
       tvList.lockQueryList();
-      List<QueryContext> queryContextList = tvList.getQueryContextList();
+      Set<QueryContext> queryContextSet = tvList.getQueryContextSet();
       try {
-        queryContextList.remove(this);
+        queryContextSet.remove(this);
         if (tvList.getOwnerQuery() == this) {
-          if (queryContextList.isEmpty()) {
+          if (queryContextSet.isEmpty()) {
             LOGGER.debug(
                 "TVList {} is released by the query, FragmentInstance Id is 
{}",
                 tvList,
@@ -663,11 +693,13 @@ public class FragmentInstanceContext extends QueryContext 
{
             
memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize());
             tvList.clear();
           } else {
+            FragmentInstanceContext queryContext =
+                (FragmentInstanceContext) queryContextSet.iterator().next();
             LOGGER.debug(
                 "TVList {} is now owned by another query, FragmentInstance Id 
is {}",
                 tvList,
-                ((FragmentInstanceContext) queryContextList.get(0)).getId());
-            tvList.setOwnerQuery(queryContextList.get(0));
+                queryContext.getId());
+            tvList.setOwnerQuery(queryContext);
           }
         }
       } finally {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
index adba4a163dc..78dce2cff39 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
@@ -135,7 +135,7 @@ public abstract class ResourceByPathUtils {
       try {
         LOGGER.debug(
             "Flushing/Working MemTable - add current query context to 
immutable TVList's query list");
-        tvList.getQueryContextList().add(context);
+        tvList.getQueryContextSet().add(context);
         tvListQueryMap.put(tvList, tvList.rowCount());
       } finally {
         tvList.unlockQueryList();
@@ -155,13 +155,13 @@ public abstract class ResourceByPathUtils {
       if (!isWorkMemTable) {
         LOGGER.debug(
             "Flushing MemTable - add current query context to mutable TVList's 
query list");
-        list.getQueryContextList().add(context);
+        list.getQueryContextSet().add(context);
         tvListQueryMap.put(list, list.rowCount());
       } else {
-        if (list.isSorted() || list.getQueryContextList().isEmpty()) {
+        if (list.isSorted() || list.getQueryContextSet().isEmpty()) {
           LOGGER.debug(
               "Working MemTable - add current query context to mutable 
TVList's query list when it's sorted or no other query on it");
-          list.getQueryContextList().add(context);
+          list.getQueryContextSet().add(context);
           tvListQueryMap.put(list, list.rowCount());
         } else {
           /*
@@ -180,7 +180,7 @@ public abstract class ResourceByPathUtils {
            */
           LOGGER.debug(
               "Working MemTable - clone mutable TVList and replace old TVList 
in working MemTable");
-          QueryContext firstQuery = list.getQueryContextList().get(0);
+          QueryContext firstQuery = 
list.getQueryContextSet().iterator().next();
           // reserve query memory
           if (firstQuery instanceof FragmentInstanceContext) {
             MemoryReservationManager memoryReservationManager =
@@ -191,7 +191,7 @@ public abstract class ResourceByPathUtils {
 
           // clone TVList
           cloneList = list.clone();
-          cloneList.getQueryContextList().add(context);
+          cloneList.getQueryContextSet().add(context);
           tvListQueryMap.put(cloneList, cloneList.rowCount());
         }
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
new file mode 100644
index 00000000000..1030f2f641a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.memtable;
+
+import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.db.utils.datastructure.TVList;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.chunk.IChunkWriter;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+public abstract class AbstractWritableMemChunk implements IWritableMemChunk {
+  protected static long RETRY_INTERVAL_MS = 100L;
+  protected static long MAX_WAIT_QUERY_MS = 60 * 1000L;
+
+  /**
+   * Release the TVList if there is no query on it. Otherwise, it should set 
the first query as the
+   * owner. TVList is released until all queries finish. If it throws 
memory-not-enough exception
+   * during owner transfer, retry the release process after 100ms. If the 
problem is still not
+   * solved in 60s, it starts to abort first query, kick it out of the query 
list and retry. This
+   * method must ensure success because it's part of flushing.
+   *
+   * @param tvList
+   */
+  protected void maybeReleaseTvList(TVList tvList) {
+    long startTimeInMs = System.currentTimeMillis();
+    boolean succeed = false;
+    while (!succeed) {
+      try {
+        tryReleaseTvList(tvList);
+        succeed = true;
+      } catch (MemoryNotEnoughException ex) {
+        long waitQueryInMs = System.currentTimeMillis() - startTimeInMs;
+        if (waitQueryInMs > MAX_WAIT_QUERY_MS) {
+          // Abort first query in the list. When all queries in the list have 
been aborted,
+          // tryReleaseTvList will ensure succeed finally.
+          tvList.lockQueryList();
+          try {
+            // fail the first query
+            Iterator<QueryContext> iterator = 
tvList.getQueryContextSet().iterator();
+            if (iterator.hasNext()) {
+              FragmentInstanceContext firstQuery = (FragmentInstanceContext) 
iterator.next();
+              firstQuery.failed(
+                  new MemoryNotEnoughException(
+                      "Memory not enough to clone the tvlist during flush 
phase"));
+            }
+          } finally {
+            tvList.unlockQueryList();
+          }
+        }
+
+        // sleep 100ms to retry
+        try {
+          Thread.sleep(RETRY_INTERVAL_MS);
+        } catch (InterruptedException ignore) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+  }
+
+  private void tryReleaseTvList(TVList tvList) {
+    tvList.lockQueryList();
+    try {
+      if (tvList.getQueryContextSet().isEmpty()) {
+        tvList.clear();
+      } else {
+        QueryContext firstQuery = 
tvList.getQueryContextSet().iterator().next();
+        // transfer memory from write process to read process. Here it 
reserves read memory and
+        // releaseFlushedMemTable will release write memory.
+        if (firstQuery instanceof FragmentInstanceContext) {
+          MemoryReservationManager memoryReservationManager =
+              ((FragmentInstanceContext) 
firstQuery).getMemoryReservationContext();
+          
memoryReservationManager.reserveMemoryCumulatively(tvList.calculateRamSize());
+        }
+        // update current TVList owner to first query in the list
+        tvList.setOwnerQuery(firstQuery);
+      }
+    } finally {
+      tvList.unlockQueryList();
+    }
+  }
+
+  @Override
+  public abstract void putLong(long t, long v);
+
+  @Override
+  public abstract void putInt(long t, int v);
+
+  @Override
+  public abstract void putFloat(long t, float v);
+
+  @Override
+  public abstract void putDouble(long t, double v);
+
+  @Override
+  public abstract void putBinary(long t, Binary v);
+
+  @Override
+  public abstract void putBoolean(long t, boolean v);
+
+  @Override
+  public abstract void putAlignedRow(long t, Object[] v);
+
+  @Override
+  public abstract void putLongs(long[] t, long[] v, BitMap bitMap, int start, 
int end);
+
+  @Override
+  public abstract void putInts(long[] t, int[] v, BitMap bitMap, int start, 
int end);
+
+  @Override
+  public abstract void putFloats(long[] t, float[] v, BitMap bitMap, int 
start, int end);
+
+  @Override
+  public abstract void putDoubles(long[] t, double[] v, BitMap bitMap, int 
start, int end);
+
+  @Override
+  public abstract void putBinaries(long[] t, Binary[] v, BitMap bitMap, int 
start, int end);
+
+  @Override
+  public abstract void putBooleans(long[] t, boolean[] v, BitMap bitMap, int 
start, int end);
+
+  @Override
+  public abstract void putAlignedTablet(long[] t, Object[] v, BitMap[] 
bitMaps, int start, int end);
+
+  @Override
+  public abstract void writeNonAlignedPoint(long insertTime, Object 
objectValue);
+
+  @Override
+  public abstract void writeAlignedPoints(
+      long insertTime, Object[] objectValue, List<IMeasurementSchema> 
schemaList);
+
+  @Override
+  public abstract void writeNonAlignedTablet(
+      long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int 
start, int end);
+
+  @Override
+  public abstract void writeAlignedTablet(
+      long[] times,
+      Object[] valueList,
+      BitMap[] bitMaps,
+      List<IMeasurementSchema> schemaList,
+      int start,
+      int end);
+
+  @Override
+  public abstract long count();
+
+  @Override
+  public abstract long rowCount();
+
+  @Override
+  public abstract IMeasurementSchema getSchema();
+
+  @Override
+  public abstract void sortTvListForFlush();
+
+  @Override
+  public abstract int delete(long lowerBound, long upperBound);
+
+  @Override
+  public abstract IChunkWriter createIChunkWriter();
+
+  @Override
+  public abstract void encode(BlockingQueue<Object> ioTaskQueue);
+
+  @Override
+  public abstract void release();
+
+  @Override
+  public abstract boolean isEmpty();
+
+  @Override
+  public abstract List<? extends TVList> getSortedList();
+
+  @Override
+  public abstract TVList getWorkingTVList();
+
+  @Override
+  public abstract void setWorkingTVList(TVList list);
+
+  @Override
+  public abstract void serializeToWAL(IWALByteBufferView buffer);
+
+  @Override
+  public abstract int serializedSize();
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index 49be365336f..c8acb12768f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -21,9 +21,6 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
-import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
-import 
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
 import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
@@ -60,10 +57,10 @@ import java.util.concurrent.BlockingQueue;
 
 import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted;
 
-public class AlignedWritableMemChunk implements IWritableMemChunk {
+public class AlignedWritableMemChunk extends AbstractWritableMemChunk {
 
   private final Map<String, Integer> measurementIndexMap;
-  private final List<TSDataType> dataTypes;
+  private List<TSDataType> dataTypes;
   private final List<IMeasurementSchema> schemaList;
   private AlignedTVList list;
   private List<AlignedTVList> sortedList;
@@ -190,33 +187,12 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
   }
 
   protected void handoverAlignedTvList() {
-    // ensure query contexts won't be removed from list during handover 
process.
-    list.lockQueryList();
-    try {
-      if (list.isSorted()) {
-        sortedList.add(list);
-      } else if (list.getQueryContextList().isEmpty()) {
-        list.sort();
-        sortedList.add(list);
-      } else {
-        QueryContext firstQuery = list.getQueryContextList().get(0);
-        // reserve query memory
-        if (firstQuery instanceof FragmentInstanceContext) {
-          MemoryReservationManager memoryReservationManager =
-              ((FragmentInstanceContext) 
firstQuery).getMemoryReservationContext();
-          
memoryReservationManager.reserveMemoryCumulatively(list.calculateRamSize());
-        }
-        // update current TVList owner to first query in the list
-        list.setOwnerQuery(firstQuery);
-        // clone tv list
-        AlignedTVList cloneList = list.clone();
-        cloneList.sort();
-        sortedList.add(cloneList);
-      }
-    } finally {
-      list.unlockQueryList();
+    if (!list.isSorted()) {
+      list.sort();
     }
-    this.list = AlignedTVList.newAlignedList(dataTypes);
+    sortedList.add(list);
+    this.list = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes));
+    this.dataTypes = list.getTsDataTypes();
   }
 
   @Override
@@ -248,112 +224,6 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
     }
   }
 
-  /**
-   * Check metadata of columns and return array that mapping existed metadata 
to index of data
-   * column.
-   *
-   * @param schemaListInInsertPlan Contains all existed schema in InsertPlan. 
If some timeseries
-   *     have been deleted, there will be null in its slot.
-   * @return columnIndexArray: schemaList[i] is schema of 
columns[columnIndexArray[i]]
-   */
-  private Pair<Object[], BitMap[]> checkAndReorderColumnValuesInInsertPlan(
-      List<IMeasurementSchema> schemaListInInsertPlan, Object[] columnValues, 
BitMap[] bitMaps) {
-    Object[] reorderedColumnValues = new Object[schemaList.size()];
-    BitMap[] reorderedBitMaps = bitMaps == null ? null : new 
BitMap[schemaList.size()];
-    for (int i = 0; i < schemaListInInsertPlan.size(); i++) {
-      IMeasurementSchema measurementSchema = schemaListInInsertPlan.get(i);
-      if (measurementSchema != null) {
-        Integer index = 
this.measurementIndexMap.get(measurementSchema.getMeasurementId());
-        // Index is null means this measurement was not in this AlignedTVList 
before.
-        // We need to extend a new column in AlignedMemChunk and AlignedTVList.
-        // And the reorderedColumnValues should extend one more column for the 
new measurement
-        if (index == null) {
-          index =
-              measurementIndexMap.isEmpty()
-                  ? 0
-                  : measurementIndexMap.values().stream()
-                          .mapToInt(Integer::intValue)
-                          .max()
-                          .getAsInt()
-                      + 1;
-          
this.measurementIndexMap.put(schemaListInInsertPlan.get(i).getMeasurementId(), 
index);
-          this.schemaList.add(schemaListInInsertPlan.get(i));
-          this.list.extendColumn(schemaListInInsertPlan.get(i).getType());
-          reorderedColumnValues =
-              Arrays.copyOf(reorderedColumnValues, 
reorderedColumnValues.length + 1);
-          if (reorderedBitMaps != null) {
-            reorderedBitMaps = Arrays.copyOf(reorderedBitMaps, 
reorderedBitMaps.length + 1);
-          }
-        }
-        reorderedColumnValues[index] = columnValues[i];
-        if (bitMaps != null) {
-          reorderedBitMaps[index] = bitMaps[i];
-        }
-      }
-    }
-    return new Pair<>(reorderedColumnValues, reorderedBitMaps);
-  }
-
-  private void filterDeletedTimeStamp(
-      AlignedTVList alignedTVList,
-      List<List<TimeRange>> valueColumnsDeletionList,
-      Map<Long, BitMap> timestampWithBitmap) {
-    BitMap allValueColDeletedMap = alignedTVList.getAllValueColDeletedMap();
-
-    int rowCount = alignedTVList.rowCount();
-    List<int[]> valueColumnDeleteCursor = new ArrayList<>();
-    if (valueColumnsDeletionList != null) {
-      valueColumnsDeletionList.forEach(x -> valueColumnDeleteCursor.add(new 
int[] {0}));
-    }
-
-    for (int row = 0; row < rowCount; row++) {
-      // the row is deleted
-      if (allValueColDeletedMap != null && 
allValueColDeletedMap.isMarked(row)) {
-        continue;
-      }
-      long timestamp = alignedTVList.getTime(row);
-
-      BitMap bitMap = new BitMap(schemaList.size());
-      for (int column = 0; column < schemaList.size(); column++) {
-        if (alignedTVList.isNullValue(alignedTVList.getValueIndex(row), 
column)) {
-          bitMap.mark(column);
-        }
-
-        // skip deleted row
-        if (valueColumnsDeletionList != null
-            && !valueColumnsDeletionList.isEmpty()
-            && isPointDeleted(
-                timestamp,
-                valueColumnsDeletionList.get(column),
-                valueColumnDeleteCursor.get(column))) {
-          bitMap.mark(column);
-        }
-
-        // skip all-null row
-        if (bitMap.isAllMarked()) {
-          continue;
-        }
-        timestampWithBitmap.put(timestamp, bitMap);
-      }
-    }
-  }
-
-  public long[] getFilteredTimestamp(List<List<TimeRange>> deletionList, 
List<BitMap> bitMaps) {
-    Map<Long, BitMap> timestampWithBitmap = new TreeMap<>();
-
-    filterDeletedTimeStamp(list, deletionList, timestampWithBitmap);
-    for (AlignedTVList alignedTVList : sortedList) {
-      filterDeletedTimeStamp(alignedTVList, deletionList, timestampWithBitmap);
-    }
-
-    List<Long> filteredTimestamps = new ArrayList<>();
-    for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) {
-      filteredTimestamps.add(entry.getKey());
-      bitMaps.add(entry.getValue());
-    }
-    return filteredTimestamps.stream().mapToLong(Long::valueOf).toArray();
-  }
-
   @Override
   public AlignedTVList getWorkingTVList() {
     return list;
@@ -408,40 +278,6 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
     return minTime;
   }
 
-  @Override
-  public synchronized TVList getSortedTvListForQuery() {
-    sortTVList();
-    // increase reference count
-    list.increaseReferenceCount();
-    return list;
-  }
-
-  @Override
-  public synchronized TVList getSortedTvListForQuery(List<IMeasurementSchema> 
schemaList) {
-    sortTVList();
-    // increase reference count
-    list.increaseReferenceCount();
-    List<Integer> columnIndexList = new ArrayList<>();
-    List<TSDataType> dataTypeList = new ArrayList<>();
-    for (IMeasurementSchema measurementSchema : schemaList) {
-      columnIndexList.add(
-          
measurementIndexMap.getOrDefault(measurementSchema.getMeasurementId(), -1));
-      dataTypeList.add(measurementSchema.getType());
-    }
-    return list.getTvListByColumnIndex(columnIndexList, dataTypeList);
-  }
-
-  private void sortTVList() {
-    // check reference count
-    if ((list.getReferenceCount() > 0 && !list.isSorted())) {
-      list = list.clone();
-    }
-
-    if (!list.isSorted()) {
-      list.sort();
-    }
-  }
-
   @Override
   public synchronized void sortTvListForFlush() {
     if (!list.isSorted()) {
@@ -784,28 +620,6 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
     }
   }
 
-  private void maybeReleaseTvList(AlignedTVList alignedTvList) {
-    alignedTvList.lockQueryList();
-    try {
-      if (alignedTvList.getQueryContextList().isEmpty()) {
-        alignedTvList.clear();
-      } else {
-        QueryContext firstQuery = alignedTvList.getQueryContextList().get(0);
-        // transfer memory from write process to read process. Here it 
reserves read memory and
-        // releaseFlushedMemTable will release write memory.
-        if (firstQuery instanceof FragmentInstanceContext) {
-          MemoryReservationManager memoryReservationManager =
-              ((FragmentInstanceContext) 
firstQuery).getMemoryReservationContext();
-          
memoryReservationManager.reserveMemoryCumulatively(alignedTvList.calculateRamSize());
-        }
-        // update current TVList owner to first query in the list
-        alignedTvList.setOwnerQuery(firstQuery);
-      }
-    } finally {
-      alignedTvList.unlockQueryList();
-    }
-  }
-
   @Override
   public void release() {
     maybeReleaseTvList(list);
@@ -914,4 +728,103 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
     }
     return columnIndexList;
   }
+
+  /**
+   * Check metadata of columns and return array that mapping existed metadata 
to index of data
+   * column.
+   *
+   * @param schemaListInInsertPlan Contains all existed schema in InsertPlan. 
If some timeseries
+   *     have been deleted, there will be null in its slot.
+   * @return columnIndexArray: schemaList[i] is schema of 
columns[columnIndexArray[i]]
+   */
+  private Pair<Object[], BitMap[]> checkAndReorderColumnValuesInInsertPlan(
+      List<IMeasurementSchema> schemaListInInsertPlan, Object[] columnValues, 
BitMap[] bitMaps) {
+    Object[] reorderedColumnValues = new Object[schemaList.size()];
+    BitMap[] reorderedBitMaps = bitMaps == null ? null : new 
BitMap[schemaList.size()];
+    for (int i = 0; i < schemaListInInsertPlan.size(); i++) {
+      IMeasurementSchema measurementSchema = schemaListInInsertPlan.get(i);
+      if (measurementSchema != null) {
+        Integer index = 
this.measurementIndexMap.get(measurementSchema.getMeasurementId());
+        // Index is null means this measurement was not in this AlignedTVList 
before.
+        // We need to extend a new column in AlignedMemChunk and AlignedTVList.
+        // And the reorderedColumnValues should extend one more column for the 
new measurement
+        if (index == null) {
+          index = this.list.getTsDataTypes().size();
+          
this.measurementIndexMap.put(schemaListInInsertPlan.get(i).getMeasurementId(), 
index);
+          this.schemaList.add(schemaListInInsertPlan.get(i));
+          this.list.extendColumn(schemaListInInsertPlan.get(i).getType());
+          reorderedColumnValues =
+              Arrays.copyOf(reorderedColumnValues, 
reorderedColumnValues.length + 1);
+          if (reorderedBitMaps != null) {
+            reorderedBitMaps = Arrays.copyOf(reorderedBitMaps, 
reorderedBitMaps.length + 1);
+          }
+        }
+        reorderedColumnValues[index] = columnValues[i];
+        if (bitMaps != null) {
+          reorderedBitMaps[index] = bitMaps[i];
+        }
+      }
+    }
+    return new Pair<>(reorderedColumnValues, reorderedBitMaps);
+  }
+
+  private void filterDeletedTimeStamp(
+      AlignedTVList alignedTVList,
+      List<List<TimeRange>> valueColumnsDeletionList,
+      Map<Long, BitMap> timestampWithBitmap) {
+    BitMap allValueColDeletedMap = alignedTVList.getAllValueColDeletedMap();
+
+    int rowCount = alignedTVList.rowCount();
+    List<int[]> valueColumnDeleteCursor = new ArrayList<>();
+    if (valueColumnsDeletionList != null) {
+      valueColumnsDeletionList.forEach(x -> valueColumnDeleteCursor.add(new 
int[] {0}));
+    }
+
+    for (int row = 0; row < rowCount; row++) {
+      // the row is deleted
+      if (allValueColDeletedMap != null && 
allValueColDeletedMap.isMarked(row)) {
+        continue;
+      }
+      long timestamp = alignedTVList.getTime(row);
+
+      BitMap bitMap = new BitMap(schemaList.size());
+      for (int column = 0; column < schemaList.size(); column++) {
+        if (alignedTVList.isNullValue(alignedTVList.getValueIndex(row), 
column)) {
+          bitMap.mark(column);
+        }
+
+        // skip deleted row
+        if (valueColumnsDeletionList != null
+            && !valueColumnsDeletionList.isEmpty()
+            && isPointDeleted(
+                timestamp,
+                valueColumnsDeletionList.get(column),
+                valueColumnDeleteCursor.get(column))) {
+          bitMap.mark(column);
+        }
+
+        // skip all-null row
+        if (bitMap.isAllMarked()) {
+          continue;
+        }
+        timestampWithBitmap.put(timestamp, bitMap);
+      }
+    }
+  }
+
+  public long[] getFilteredTimestamp(List<List<TimeRange>> deletionList, 
List<BitMap> bitMaps) {
+    Map<Long, BitMap> timestampWithBitmap = new TreeMap<>();
+
+    filterDeletedTimeStamp(list, deletionList, timestampWithBitmap);
+    for (AlignedTVList alignedTVList : sortedList) {
+      filterDeletedTimeStamp(alignedTVList, deletionList, timestampWithBitmap);
+    }
+
+    List<Long> filteredTimestamps = new ArrayList<>();
+    for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) {
+      filteredTimestamps.add(entry.getKey());
+      bitMaps.add(entry.getValue());
+    }
+    return filteredTimestamps.stream().mapToLong(Long::valueOf).toArray();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index 21e715cbd50..25ed5214e51 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -85,35 +85,6 @@ public interface IWritableMemChunk extends WALEntryValue {
 
   IMeasurementSchema getSchema();
 
-  /**
-   * served for read requests.
-   *
-   * <p>if tv list has been sorted, just return reference of it
-   *
-   * <p>if tv list hasn't been sorted and has no reference, sort and return 
reference of it
-   *
-   * <p>if tv list hasn't been sorted and has reference we should copy and 
sort it, then return ths
-   * list
-   *
-   * <p>the mechanism is just like copy on write
-   *
-   * <p>This interface should be synchronized for concurrent with 
sortTvListForFlush
-   *
-   * @return sorted tv list
-   */
-  TVList getSortedTvListForQuery();
-
-  /**
-   * served for vector read requests.
-   *
-   * <p>the mechanism is just like copy on write
-   *
-   * <p>This interface should be synchronized for concurrent with 
sortTvListForFlush
-   *
-   * @return sorted tv list
-   */
-  TVList getSortedTvListForQuery(List<IMeasurementSchema> schemaList);
-
   /**
    * served for flush requests. The logic is just same as 
getSortedTVListForQuery, but without add
    * reference count
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 3169c878486..99c0157b790 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -20,9 +20,6 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
-import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
-import 
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.utils.ModificationUtils;
 import org.apache.iotdb.db.utils.datastructure.MemPointIterator;
@@ -53,7 +50,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.utils.MemUtils.getBinarySize;
 
-public class WritableMemChunk implements IWritableMemChunk {
+public class WritableMemChunk extends AbstractWritableMemChunk {
 
   private IMeasurementSchema schema;
   private TVList list;
@@ -76,46 +73,10 @@ public class WritableMemChunk implements IWritableMemChunk {
   private WritableMemChunk() {}
 
   protected void handoverTvList() {
-    // ensure query contexts won't be removed from list during handover 
process.
-    list.lockQueryList();
-    try {
-      if (list.isSorted()) {
-        sortedList.add(list);
-      } else if (list.getQueryContextList().isEmpty()) {
-        list.sort();
-        sortedList.add(list);
-      } else {
-        /*
-         * +----------------------+
-         * |      MemTable        |
-         * |                      |
-         * |   +---------------+  |          +----------+
-         * |   | sorted TVList |  |      +---+   Query  |
-         * |   +------^--------+  |      |   +----------+
-         * |          |           |      |
-         * +----------+-----------+      |
-         *            | Clone + Sort     |
-         *      +-----+------+           |
-         *      |   TVList   | <---------+
-         *      +------------+
-         */
-        QueryContext firstQuery = list.getQueryContextList().get(0);
-        // reserve query memory
-        if (firstQuery instanceof FragmentInstanceContext) {
-          MemoryReservationManager memoryReservationManager =
-              ((FragmentInstanceContext) 
firstQuery).getMemoryReservationContext();
-          
memoryReservationManager.reserveMemoryCumulatively(list.calculateRamSize());
-        }
-        // update current TVList owner to first query in the list
-        list.setOwnerQuery(firstQuery);
-        // clone tv list
-        TVList cloneList = list.clone();
-        cloneList.sort();
-        sortedList.add(cloneList);
-      }
-    } finally {
-      list.unlockQueryList();
+    if (!list.isSorted()) {
+      list.sort();
     }
+    sortedList.add(list);
     this.list = TVList.newList(schema.getType());
   }
 
@@ -279,30 +240,6 @@ public class WritableMemChunk implements IWritableMemChunk 
{
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
   }
 
-  @Override
-  public synchronized TVList getSortedTvListForQuery() {
-    sortTVList();
-    // increase reference count
-    list.increaseReferenceCount();
-    return list;
-  }
-
-  @Override
-  public synchronized TVList getSortedTvListForQuery(List<IMeasurementSchema> 
measurementSchema) {
-    throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
list.getDataType());
-  }
-
-  private void sortTVList() {
-    // check reference count
-    if ((list.getReferenceCount() > 0 && !list.isSorted())) {
-      list = list.clone();
-    }
-
-    if (!list.isSorted()) {
-      list.sort();
-    }
-  }
-
   @Override
   public synchronized void sortTvListForFlush() {
     if (!list.isSorted()) {
@@ -310,43 +247,6 @@ public class WritableMemChunk implements IWritableMemChunk 
{
     }
   }
 
-  private void filterDeletedTimestamp(
-      TVList tvlist, List<TimeRange> deletionList, List<Long> timestampList) {
-    long lastTime = Long.MIN_VALUE;
-    int[] deletionCursor = {0};
-    int rowCount = tvlist.rowCount();
-    for (int i = 0; i < rowCount; i++) {
-      if (tvlist.getBitMap() != null && 
tvlist.isNullValue(tvlist.getValueIndex(i))) {
-        continue;
-      }
-      long curTime = tvlist.getTime(i);
-      if (deletionList != null
-          && ModificationUtils.isPointDeleted(curTime, deletionList, 
deletionCursor)) {
-        continue;
-      }
-
-      if (i == rowCount - 1 || curTime != lastTime) {
-        timestampList.add(curTime);
-      }
-      lastTime = curTime;
-    }
-  }
-
-  public long[] getFilteredTimestamp(List<TimeRange> deletionList) {
-    List<Long> timestampList = new ArrayList<>();
-    filterDeletedTimestamp(list, deletionList, timestampList);
-    for (TVList tvList : sortedList) {
-      filterDeletedTimestamp(tvList, deletionList, timestampList);
-    }
-
-    // remove duplicated time
-    List<Long> distinctTimestamps = 
timestampList.stream().distinct().collect(Collectors.toList());
-    // sort timestamps
-    long[] filteredTimestamps = 
distinctTimestamps.stream().mapToLong(Long::longValue).toArray();
-    Arrays.sort(filteredTimestamps);
-    return filteredTimestamps;
-  }
-
   @Override
   public TVList getWorkingTVList() {
     return list;
@@ -646,34 +546,6 @@ public class WritableMemChunk implements IWritableMemChunk 
{
     }
   }
 
-  /**
-   * Release process for memtable flush. Release the TVList if there is no 
query on it, otherwise
-   * set query owner and release the TVList until query finishes.
-   *
-   * @param tvList
-   */
-  private void maybeReleaseTvList(TVList tvList) {
-    tvList.lockQueryList();
-    try {
-      if (tvList.getQueryContextList().isEmpty()) {
-        tvList.clear();
-      } else {
-        QueryContext firstQuery = tvList.getQueryContextList().get(0);
-        // transfer memory from write process to read process. Here it 
reserves read memory and
-        // releaseFlushedMemTable will release write memory.
-        if (firstQuery instanceof FragmentInstanceContext) {
-          MemoryReservationManager memoryReservationManager =
-              ((FragmentInstanceContext) 
firstQuery).getMemoryReservationContext();
-          
memoryReservationManager.reserveMemoryCumulatively(tvList.calculateRamSize());
-        }
-        // update current TVList owner to first query in the list
-        tvList.setOwnerQuery(firstQuery);
-      }
-    } finally {
-      tvList.unlockQueryList();
-    }
-  }
-
   @Override
   public void release() {
     maybeReleaseTvList(list);
@@ -729,4 +601,41 @@ public class WritableMemChunk implements IWritableMemChunk 
{
   public List<TVList> getSortedList() {
     return sortedList;
   }
+
+  private void filterDeletedTimestamp(
+      TVList tvlist, List<TimeRange> deletionList, List<Long> timestampList) {
+    long lastTime = Long.MIN_VALUE;
+    int[] deletionCursor = {0};
+    int rowCount = tvlist.rowCount();
+    for (int i = 0; i < rowCount; i++) {
+      if (tvlist.getBitMap() != null && 
tvlist.isNullValue(tvlist.getValueIndex(i))) {
+        continue;
+      }
+      long curTime = tvlist.getTime(i);
+      if (deletionList != null
+          && ModificationUtils.isPointDeleted(curTime, deletionList, 
deletionCursor)) {
+        continue;
+      }
+
+      if (i == rowCount - 1 || curTime != lastTime) {
+        timestampList.add(curTime);
+      }
+      lastTime = curTime;
+    }
+  }
+
+  public long[] getFilteredTimestamp(List<TimeRange> deletionList) {
+    List<Long> timestampList = new ArrayList<>();
+    filterDeletedTimestamp(list, deletionList, timestampList);
+    for (TVList tvList : sortedList) {
+      filterDeletedTimestamp(tvList, deletionList, timestampList);
+    }
+
+    // remove duplicated time
+    List<Long> distinctTimestamps = 
timestampList.stream().distinct().collect(Collectors.toList());
+    // sort timestamps
+    long[] filteredTimestamps = 
distinctTimestamps.stream().mapToLong(Long::longValue).toArray();
+    Arrays.sort(filteredTimestamps);
+    return filteredTimestamps;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index f379f236290..95b5da1be24 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -121,7 +121,7 @@ public abstract class AlignedTVList extends TVList {
         }
       }
     }
-    AlignedTVList alignedTvList = AlignedTVList.newAlignedList(dataTypeList);
+    AlignedTVList alignedTvList = AlignedTVList.newAlignedList(new 
ArrayList<>(dataTypeList));
     alignedTvList.timestamps = this.timestamps;
     alignedTvList.indices = this.indices;
     alignedTvList.values = values;
@@ -132,8 +132,8 @@ public abstract class AlignedTVList extends TVList {
   }
 
   @Override
-  public AlignedTVList clone() {
-    AlignedTVList cloneList = AlignedTVList.newAlignedList(dataTypes);
+  public synchronized AlignedTVList clone() {
+    AlignedTVList cloneList = AlignedTVList.newAlignedList(new 
ArrayList<>(dataTypes));
     cloneAs(cloneList);
     System.arraycopy(
         memoryBinaryChunkSize, 0, cloneList.memoryBinaryChunkSize, 0, 
dataTypes.size());
@@ -330,10 +330,11 @@ public abstract class AlignedTVList extends TVList {
 
   public void extendColumn(TSDataType dataType) {
     if (bitMaps == null) {
-      bitMaps = new ArrayList<>(values.size());
+      List<List<BitMap>> localBitMaps = new ArrayList<>(values.size());
       for (int i = 0; i < values.size(); i++) {
-        bitMaps.add(null);
+        localBitMaps.add(null);
       }
+      bitMaps = localBitMaps;
     }
     List<Object> columnValue = new ArrayList<>();
     List<BitMap> columnBitMaps = new ArrayList<>();
@@ -555,10 +556,11 @@ public abstract class AlignedTVList extends TVList {
 
   public void deleteColumn(int columnIndex) {
     if (bitMaps == null) {
-      bitMaps = new ArrayList<>(dataTypes.size());
+      List<List<BitMap>> localBitMaps = new ArrayList<>(dataTypes.size());
       for (int j = 0; j < dataTypes.size(); j++) {
-        bitMaps.add(null);
+        localBitMaps.add(null);
       }
+      bitMaps = localBitMaps;
     }
     if (bitMaps.get(columnIndex) == null) {
       List<BitMap> columnBitMaps = new ArrayList<>();
@@ -568,6 +570,9 @@ public abstract class AlignedTVList extends TVList {
       bitMaps.set(columnIndex, columnBitMaps);
     }
     for (int i = 0; i < bitMaps.get(columnIndex).size(); i++) {
+      if (bitMaps.get(columnIndex).get(i) == null) {
+        bitMaps.get(columnIndex).set(i, new BitMap(ARRAY_SIZE));
+      }
       bitMaps.get(columnIndex).get(i).markAll();
     }
   }
@@ -783,10 +788,11 @@ public abstract class AlignedTVList extends TVList {
   private void markNullValue(int columnIndex, int arrayIndex, int 
elementIndex) {
     // init BitMaps if doesn't have
     if (bitMaps == null) {
-      bitMaps = new ArrayList<>(dataTypes.size());
+      List<List<BitMap>> localBitMaps = new ArrayList<>(dataTypes.size());
       for (int i = 0; i < dataTypes.size(); i++) {
-        bitMaps.add(null);
+        localBitMaps.add(null);
       }
+      bitMaps = localBitMaps;
     }
 
     // if the bitmap in columnIndex is null, init the bitmap of this column 
from the beginning
@@ -1228,7 +1234,7 @@ public abstract class AlignedTVList extends TVList {
       bitMaps[columnIndex] = bitMap;
     }
 
-    AlignedTVList tvList = AlignedTVList.newAlignedList(dataTypes);
+    AlignedTVList tvList = AlignedTVList.newAlignedList(new 
ArrayList<>(dataTypes));
     tvList.putAlignedValues(times, values, bitMaps, 0, rowCount);
     return tvList;
   }
@@ -1335,8 +1341,8 @@ public abstract class AlignedTVList extends TVList {
     private final BitMap allValueColDeletedMap;
     private final List<TSDataType> dataTypeList;
     private final List<Integer> columnIndexList;
-    List<List<TimeRange>> valueColumnsDeletionList;
-    private final Integer floatPrecision;
+    private final List<List<TimeRange>> valueColumnsDeletionList;
+    private final int floatPrecision;
     private final List<TSEncoding> encodingList;
 
     // remember the selected index of last not-null value for each column 
during prepareNext phase
@@ -1362,7 +1368,7 @@ public abstract class AlignedTVList extends TVList {
               : columnIndexList;
       this.allValueColDeletedMap = getAllValueColDeletedMap();
       this.valueColumnsDeletionList = valueColumnsDeletionList;
-      this.floatPrecision = floatPrecision;
+      this.floatPrecision = floatPrecision != null ? floatPrecision : 0;
       this.encodingList = encodingList;
       this.selectedIndices = new int[dataTypeList.size()];
       for (int i = 0; i < dataTypeList.size(); i++) {
@@ -1459,7 +1465,7 @@ public abstract class AlignedTVList extends TVList {
     }
 
     public TsPrimitiveType getPrimitiveTypeObject(int rowIndex, int 
columnIndex) {
-      int valueIndex = getValueIndex(index);
+      int valueIndex = getValueIndex(rowIndex);
       if (valueIndex < 0 || valueIndex >= rows) {
         return null;
       }
@@ -1483,19 +1489,19 @@ public abstract class AlignedTVList extends TVList {
           return TsPrimitiveType.getByType(
               TSDataType.INT64, getLongByValueIndex(valueIndex, 
validColumnIndex));
         case FLOAT:
-          return TsPrimitiveType.getByType(
-              TSDataType.FLOAT,
-              roundValueWithGivenPrecision(
-                  getFloatByValueIndex(valueIndex, validColumnIndex),
-                  floatPrecision,
-                  encodingList.get(columnIndex)));
+          float valueF = getFloatByValueIndex(valueIndex, validColumnIndex);
+          if (encodingList != null) {
+            valueF =
+                roundValueWithGivenPrecision(valueF, floatPrecision, 
encodingList.get(columnIndex));
+          }
+          return TsPrimitiveType.getByType(TSDataType.FLOAT, valueF);
         case DOUBLE:
-          return TsPrimitiveType.getByType(
-              TSDataType.DOUBLE,
-              roundValueWithGivenPrecision(
-                  getDoubleByValueIndex(valueIndex, validColumnIndex),
-                  floatPrecision,
-                  encodingList.get(columnIndex)));
+          double valueD = getDoubleByValueIndex(valueIndex, validColumnIndex);
+          if (encodingList != null) {
+            valueD =
+                roundValueWithGivenPrecision(valueD, floatPrecision, 
encodingList.get(columnIndex));
+          }
+          return TsPrimitiveType.getByType(TSDataType.DOUBLE, valueD);
         case TEXT:
         case BLOB:
         case STRING:
@@ -1630,18 +1636,22 @@ public abstract class AlignedTVList extends TVList {
               valueBuilder.writeLong(getLongByValueIndex(originRowIndex, 
validColumnIndex));
               break;
             case FLOAT:
-              valueBuilder.writeFloat(
-                  roundValueWithGivenPrecision(
-                      getFloatByValueIndex(originRowIndex, validColumnIndex),
-                      floatPrecision,
-                      encodingList.get(columnIndex)));
+              float valueF = getFloatByValueIndex(originRowIndex, 
validColumnIndex);
+              if (encodingList != null) {
+                valueF =
+                    roundValueWithGivenPrecision(
+                        valueF, floatPrecision, encodingList.get(columnIndex));
+              }
+              valueBuilder.writeFloat(valueF);
               break;
             case DOUBLE:
-              valueBuilder.writeDouble(
-                  roundValueWithGivenPrecision(
-                      getDoubleByValueIndex(originRowIndex, validColumnIndex),
-                      floatPrecision,
-                      encodingList.get(columnIndex)));
+              double valueD = getDoubleByValueIndex(originRowIndex, 
validColumnIndex);
+              if (encodingList != null) {
+                valueD =
+                    roundValueWithGivenPrecision(
+                        valueD, floatPrecision, encodingList.get(columnIndex));
+              }
+              valueBuilder.writeDouble(valueD);
               break;
             case TEXT:
             case BLOB:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
index fd26eded1ac..dc4ff5529d4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
@@ -64,7 +64,7 @@ public abstract class BinaryTVList extends TVList {
   }
 
   @Override
-  public BinaryTVList clone() {
+  public synchronized BinaryTVList clone() {
     BinaryTVList cloneList = BinaryTVList.newList();
     cloneAs(cloneList);
     cloneBitMap(cloneList);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
index 2a84c13f546..b8eb0e508bf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
@@ -63,7 +63,7 @@ public abstract class BooleanTVList extends TVList {
   }
 
   @Override
-  public BooleanTVList clone() {
+  public synchronized BooleanTVList clone() {
     BooleanTVList cloneList = BooleanTVList.newList();
     cloneAs(cloneList);
     cloneBitMap(cloneList);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
index 5f82527ca76..f61995ef062 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
@@ -64,7 +64,7 @@ public abstract class DoubleTVList extends TVList {
   }
 
   @Override
-  public DoubleTVList clone() {
+  public synchronized DoubleTVList clone() {
     DoubleTVList cloneList = DoubleTVList.newList();
     cloneAs(cloneList);
     cloneBitMap(cloneList);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
index de79955a2e7..3623fa49a3e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
@@ -64,7 +64,7 @@ public abstract class FloatTVList extends TVList {
   }
 
   @Override
-  public FloatTVList clone() {
+  public synchronized FloatTVList clone() {
     FloatTVList cloneList = FloatTVList.newList();
     cloneAs(cloneList);
     cloneBitMap(cloneList);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
index f3312ebd9c2..758cd64053b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
@@ -63,7 +63,7 @@ public abstract class IntTVList extends TVList {
   }
 
   @Override
-  public IntTVList clone() {
+  public synchronized IntTVList clone() {
     IntTVList cloneList = IntTVList.newList();
     cloneAs(cloneList);
     cloneBitMap(cloneList);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
index bea59266fcd..7b4bd8d82d2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
@@ -63,7 +63,7 @@ public abstract class LongTVList extends TVList {
   }
 
   @Override
-  public LongTVList clone() {
+  public synchronized LongTVList clone() {
     LongTVList cloneList = LongTVList.newList();
     cloneAs(cloneList);
     cloneBitMap(cloneList);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java
index e2ee1af4aca..5ef966f2d27 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java
@@ -141,8 +141,9 @@ public abstract class MultiAlignedTVListIterator implements 
MemPointIterator {
           continue;
         }
 
+        int valueIndex = 
alignedTVList.getValueIndex(currentRowIndex(columnIndex));
         // null value
-        if (alignedTVList.isNullValue(currentRowIndex(columnIndex), 
validColumnIndex)) {
+        if (alignedTVList.isNullValue(valueIndex, validColumnIndex)) {
           valueBuilder.appendNull();
           continue;
         }
@@ -150,22 +151,18 @@ public abstract class MultiAlignedTVListIterator 
implements MemPointIterator {
         switch (tsDataTypeList.get(columnIndex)) {
           case BOOLEAN:
             valueBuilder.writeBoolean(
-                alignedTVList.getBooleanByValueIndex(
-                    currentRowIndex(columnIndex), validColumnIndex));
+                alignedTVList.getBooleanByValueIndex(valueIndex, 
validColumnIndex));
             break;
           case INT32:
           case DATE:
-            valueBuilder.writeInt(
-                alignedTVList.getIntByValueIndex(currentRowIndex(columnIndex), 
validColumnIndex));
+            valueBuilder.writeInt(alignedTVList.getIntByValueIndex(valueIndex, 
validColumnIndex));
             break;
           case INT64:
           case TIMESTAMP:
-            valueBuilder.writeLong(
-                
alignedTVList.getLongByValueIndex(currentRowIndex(columnIndex), 
validColumnIndex));
+            
valueBuilder.writeLong(alignedTVList.getLongByValueIndex(valueIndex, 
validColumnIndex));
             break;
           case FLOAT:
-            float valueF =
-                
alignedTVList.getFloatByValueIndex(currentRowIndex(columnIndex), 
validColumnIndex);
+            float valueF = alignedTVList.getFloatByValueIndex(valueIndex, 
validColumnIndex);
             if (encodingList != null) {
               valueF =
                   alignedTVList.roundValueWithGivenPrecision(
@@ -174,8 +171,7 @@ public abstract class MultiAlignedTVListIterator implements 
MemPointIterator {
             valueBuilder.writeFloat(valueF);
             break;
           case DOUBLE:
-            double valueD =
-                
alignedTVList.getDoubleByValueIndex(currentRowIndex(columnIndex), 
validColumnIndex);
+            double valueD = alignedTVList.getDoubleByValueIndex(valueIndex, 
validColumnIndex);
             if (encodingList != null) {
               valueD =
                   alignedTVList.roundValueWithGivenPrecision(
@@ -187,8 +183,7 @@ public abstract class MultiAlignedTVListIterator implements 
MemPointIterator {
           case BLOB:
           case STRING:
             valueBuilder.writeBinary(
-                alignedTVList.getBinaryByValueIndex(
-                    currentRowIndex(columnIndex), validColumnIndex));
+                alignedTVList.getBinaryByValueIndex(valueIndex, 
validColumnIndex));
             break;
           default:
             throw new UnSupportedDataTypeException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 6c88c1526f1..2a3e26f99c2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -41,7 +41,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -70,8 +72,8 @@ public abstract class TVList implements WALEntryValue {
 
   // lock to provide synchronization for query list
   private final ReentrantLock queryListLock = new ReentrantLock();
-  // list of query that this TVList is used
-  protected final List<QueryContext> queryContextList;
+  // set of query that this TVList is used
+  protected final Set<QueryContext> queryContextSet;
 
   // the owner query which is obligated to release the TVList.
   // When it is null, the TVList is owned by insert thread and released after 
flush.
@@ -93,7 +95,7 @@ public abstract class TVList implements WALEntryValue {
     seqRowCount = 0;
     maxTime = Long.MIN_VALUE;
     minTime = Long.MAX_VALUE;
-    queryContextList = new ArrayList<>();
+    queryContextSet = new HashSet<>();
     referenceCount = new AtomicInteger();
   }
 
@@ -242,10 +244,11 @@ public abstract class TVList implements WALEntryValue {
   protected void markNullValue(int arrayIndex, int elementIndex) {
     // init bitMap if doesn't have
     if (bitMap == null) {
-      bitMap = new ArrayList<>();
+      List<BitMap> localBitMap = new ArrayList<>();
       for (int i = 0; i < timestamps.size(); i++) {
-        bitMap.add(new BitMap(ARRAY_SIZE));
+        localBitMap.add(new BitMap(ARRAY_SIZE));
       }
+      bitMap = localBitMap;
     }
     // if the bitmap in arrayIndex is null, init the bitmap
     if (bitMap.get(arrayIndex) == null) {
@@ -417,7 +420,7 @@ public abstract class TVList implements WALEntryValue {
   }
 
   // common clone for both TVList and AlignedTVList
-  protected synchronized void cloneAs(TVList cloneList) {
+  protected void cloneAs(TVList cloneList) {
     // clone timestamps
     for (long[] timestampArray : timestamps) {
       cloneList.timestamps.add(cloneTime(timestampArray));
@@ -442,7 +445,7 @@ public abstract class TVList implements WALEntryValue {
     sorted = true;
     maxTime = Long.MIN_VALUE;
     minTime = Long.MAX_VALUE;
-    queryContextList.clear();
+    queryContextSet.clear();
     ownerQuery = null;
     clearTime();
     clearValue();
@@ -620,8 +623,8 @@ public abstract class TVList implements WALEntryValue {
     return ownerQuery;
   }
 
-  public List<QueryContext> getQueryContextList() {
-    return queryContextList;
+  public Set<QueryContext> getQueryContextSet() {
+    return queryContextSet;
   }
 
   public List<BitMap> getBitMap() {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
index 12498238f7d..572e6cd7ef8 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
@@ -18,16 +18,33 @@
  */
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException;
+import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
+import org.apache.iotdb.db.queryengine.execution.driver.IDriver;
+import 
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager;
+import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceExecution;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
+import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALByteBufferForTest;
@@ -50,6 +67,7 @@ import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -60,10 +78,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ExecutorService;
 
+import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.junit.Assert.assertEquals;
 
 public class PrimitiveMemTableTest {
+  private static final IoTDBConfig conf = 
IoTDBDescriptor.getInstance().getConfig();
+  private static final int dataNodeId = 0;
 
   String database = "root.test";
   String dataRegionId = "1";
@@ -72,6 +94,7 @@ public class PrimitiveMemTableTest {
   @Before
   public void setUp() {
     delta = Math.pow(0.1, 
TSFileDescriptor.getInstance().getConfig().getFloatPrecision());
+    conf.setDataNodeId(dataNodeId);
   }
 
   @Test
@@ -583,4 +606,64 @@ public class PrimitiveMemTableTest {
     memTable.serializeToWAL(walBuffer);
     assertEquals(0, walBuffer.getBuffer().remaining());
   }
+
+  @Test
+  public void testReleaseWithNotEnoughMemory() throws CpuNotEnoughException {
+    TSDataType dataType = TSDataType.INT32;
+    WritableMemChunk series =
+        new WritableMemChunk(new MeasurementSchema("s1", dataType, 
TSEncoding.PLAIN));
+    int count = 100;
+    for (int i = 0; i < count; i++) {
+      series.writeNonAlignedPoint(i, i);
+    }
+
+    // mock MemoryNotEnoughException exception
+    TVList list = series.getWorkingTVList();
+
+    // mock MemoryReservationManager
+    MemoryReservationManager memoryReservationManager =
+        Mockito.mock(MemoryReservationManager.class);
+    Mockito.doThrow(new MemoryNotEnoughException(""))
+        .when(memoryReservationManager)
+        .reserveMemoryCumulatively(list.calculateRamSize());
+
+    // create FragmentInstanceId
+    QueryId queryId = new QueryId("stub_query");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+    FragmentInstanceContext queryContext =
+        createFragmentInstanceContext(instanceId, stateMachine, 
memoryReservationManager);
+    queryContext.initializeNumOfDrivers(1);
+    DataRegion dataRegion = Mockito.mock(DataRegion.class);
+    queryContext.setDataRegion(dataRegion);
+
+    list.getQueryContextSet().add(queryContext);
+    Map<TVList, Integer> tvlistMap = new HashMap<>();
+    tvlistMap.put(list, 100);
+    queryContext.addTVListToSet(tvlistMap);
+
+    // fragment instance execution
+    IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
+    List<IDriver> drivers = Collections.emptyList();
+    ISink sinkHandle = Mockito.mock(ISink.class);
+    MPPDataExchangeManager exchangeManager = 
Mockito.mock(MPPDataExchangeManager.class);
+    FragmentInstanceExecution execution =
+        FragmentInstanceExecution.createFragmentInstanceExecution(
+            scheduler,
+            instanceId,
+            queryContext,
+            drivers,
+            sinkHandle,
+            stateMachine,
+            -1,
+            false,
+            exchangeManager);
+
+    queryContext.decrementNumOfUnClosedDriver();
+    series.release();
+  }
 }

Reply via email to