This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch wal-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9d210d472923cda6bfc7b22228848bd048af018d
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 14 20:04:53 2026 +0800

    Opti
---
 .../dataregion/DataExecutionVisitor.java           |   5 -
 .../planner/plan/node/write/InsertRowNode.java     |   6 +-
 .../planner/plan/node/write/InsertRowsNode.java    |   6 +-
 .../planner/plan/node/write/InsertTabletNode.java  |   6 +-
 .../plan/node/write/RelationalInsertRowNode.java   |   4 +-
 .../plan/node/write/RelationalInsertRowsNode.java  |   4 +-
 .../plan/planner/plan/node/write/SearchNode.java   |  37 ++++++
 .../db/storageengine/dataregion/DataRegion.java    |  98 ++++++++++++--
 .../storageengine/dataregion/wal/node/WALNode.java |  11 +-
 .../planner/node/write/InsertRowNodeSerdeTest.java |  90 +++++++++++++
 .../node/write/InsertRowsNodeSerdeTest.java        | 141 +++++++++++++++++++++
 .../node/write/InsertTabletNodeSerdeTest.java      |  67 ++++++++++
 .../wal/node/WALNodeWaitForRollFileTest.java       |  30 ++++-
 13 files changed, 470 insertions(+), 35 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 5de040eeba2..2e9a2dd4871 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -75,7 +75,6 @@ public class DataExecutionVisitor implements 
PlanVisitor<TSStatus, DataRegion> {
   public TSStatus visitInsertRow(InsertRowNode node, DataRegion dataRegion) {
     try {
       dataRegion.insert(node);
-      dataRegion.insertSeparatorToWAL();
       return StatusUtils.OK;
     } catch (OutOfTTLException e) {
       LOGGER.warn("Error in executing plan node: {}, caused by {}", node, 
e.getMessage());
@@ -99,7 +98,6 @@ public class DataExecutionVisitor implements 
PlanVisitor<TSStatus, DataRegion> {
   public TSStatus visitInsertTablet(final InsertTabletNode node, final 
DataRegion dataRegion) {
     try {
       dataRegion.insertTablet(node);
-      dataRegion.insertSeparatorToWAL();
       return StatusUtils.OK;
     } catch (final OutOfTTLException e) {
       LOGGER.debug("Error in executing plan node: {}, caused by {}", node, 
e.getMessage());
@@ -136,7 +134,6 @@ public class DataExecutionVisitor implements 
PlanVisitor<TSStatus, DataRegion> {
   public TSStatus visitInsertRows(InsertRowsNode node, DataRegion dataRegion) {
     try {
       dataRegion.insert(node);
-      dataRegion.insertSeparatorToWAL();
       return StatusUtils.OK;
     } catch (WriteProcessRejectException e) {
       LOGGER.warn("Reject in executing plan node: {}, caused by {}", node, 
e.getMessage());
@@ -173,7 +170,6 @@ public class DataExecutionVisitor implements 
PlanVisitor<TSStatus, DataRegion> {
   public TSStatus visitInsertMultiTablets(InsertMultiTabletsNode node, 
DataRegion dataRegion) {
     try {
       dataRegion.insertTablets(node);
-      dataRegion.insertSeparatorToWAL();
       return StatusUtils.OK;
     } catch (WriteProcessRejectException e) {
       LOGGER.warn("Reject in executing plan node: {}, caused by {}", node, 
e.getMessage());
@@ -208,7 +204,6 @@ public class DataExecutionVisitor implements 
PlanVisitor<TSStatus, DataRegion> {
       InsertRowsOfOneDeviceNode node, DataRegion dataRegion) {
     try {
       dataRegion.insert(node);
-      dataRegion.insertSeparatorToWAL();
       return StatusUtils.OK;
     } catch (WriteProcessRejectException e) {
       LOGGER.warn("Reject in executing plan node: {}, caused by {}", node, 
e.getMessage());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
index 290ec8a4a71..6bc91acd008 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
@@ -619,7 +619,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue {
   @Override
   public void serializeToWAL(IWALByteBufferView buffer) {
     buffer.putShort(getType().getNodeType());
-    buffer.putLong(searchIndex);
+    buffer.putLong(getEncodedSearchIndex());
     subSerialize(buffer);
   }
 
@@ -700,7 +700,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue {
   public static InsertRowNode deserializeFromWAL(DataInputStream stream) 
throws IOException {
     long searchIndex = stream.readLong();
     InsertRowNode insertNode = subDeserializeFromWAL(stream);
-    insertNode.setSearchIndex(searchIndex);
+    insertNode.setSearchIndexFromWAL(searchIndex);
     return insertNode;
   }
 
@@ -791,7 +791,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue {
   public static InsertRowNode deserializeFromWAL(ByteBuffer buffer) {
     long searchIndex = buffer.getLong();
     InsertRowNode insertNode = subDeserializeFromWAL(buffer);
-    insertNode.setSearchIndex(searchIndex);
+    insertNode.setSearchIndexFromWAL(searchIndex);
     return insertNode;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 28f914e1606..2be39501781 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -349,7 +349,7 @@ public class InsertRowsNode extends InsertNode implements 
WALEntryValue {
   @Override
   public void serializeToWAL(IWALByteBufferView buffer) {
     buffer.putShort(getType().getNodeType());
-    buffer.putLong(searchIndex);
+    buffer.putLong(getEncodedSearchIndex());
     subSerialize(buffer);
   }
 
@@ -377,7 +377,7 @@ public class InsertRowsNode extends InsertNode implements 
WALEntryValue {
       InsertRowNode insertRowNode = 
InsertRowNode.subDeserializeFromWAL(stream);
       insertRowsNode.addOneInsertRowNode(insertRowNode, i);
     }
-    insertRowsNode.setSearchIndex(searchIndex);
+    insertRowsNode.setSearchIndexFromWAL(searchIndex);
     return insertRowsNode;
   }
 
@@ -397,7 +397,7 @@ public class InsertRowsNode extends InsertNode implements 
WALEntryValue {
       InsertRowNode insertRowNode = 
InsertRowNode.subDeserializeFromWAL(buffer);
       insertRowsNode.addOneInsertRowNode(insertRowNode, i);
     }
-    insertRowsNode.setSearchIndex(searchIndex);
+    insertRowsNode.setSearchIndexFromWAL(searchIndex);
     return insertRowsNode;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index 995e8a95e3f..25cf3c4fc7e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -880,7 +880,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
   }
 
   void subSerialize(IWALByteBufferView buffer, List<int[]> rangeList) {
-    buffer.putLong(searchIndex);
+    buffer.putLong(getEncodedSearchIndex());
     WALWriteUtils.write(targetPath.getFullPath(), buffer);
     // data types are serialized in measurement schemas
     writeMeasurementSchemas(buffer);
@@ -1012,7 +1012,7 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
   }
 
   protected void subDeserializeFromWAL(DataInputStream stream) throws 
IOException {
-    searchIndex = stream.readLong();
+    setSearchIndexFromWAL(stream.readLong());
     try {
       targetPath = readTargetPath(stream);
     } catch (IllegalPathException e) {
@@ -1047,7 +1047,7 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
   }
 
   protected void subDeserializeFromWAL(ByteBuffer buffer) {
-    searchIndex = buffer.getLong();
+    setSearchIndexFromWAL(buffer.getLong());
     try {
       targetPath = readTargetPath(buffer);
     } catch (IllegalPathException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
index e622dba30b9..8ef6802f047 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
@@ -128,14 +128,14 @@ public class RelationalInsertRowNode extends 
InsertRowNode {
       throws IOException {
     long searchIndex = stream.readLong();
     RelationalInsertRowNode insertNode = subDeserializeFromWAL(stream);
-    insertNode.setSearchIndex(searchIndex);
+    insertNode.setSearchIndexFromWAL(searchIndex);
     return insertNode;
   }
 
   public static RelationalInsertRowNode deserializeFromWAL(ByteBuffer buffer) {
     long searchIndex = buffer.getLong();
     RelationalInsertRowNode insertNode = subDeserializeFromWAL(buffer);
-    insertNode.setSearchIndex(searchIndex);
+    insertNode.setSearchIndexFromWAL(searchIndex);
     return insertNode;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
index 83498ceefc9..741c45f3256 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
@@ -124,7 +124,7 @@ public class RelationalInsertRowsNode extends 
InsertRowsNode {
       RelationalInsertRowNode insertRowNode = 
RelationalInsertRowNode.subDeserializeFromWAL(stream);
       insertRowsNode.addOneInsertRowNode(insertRowNode, i);
     }
-    insertRowsNode.setSearchIndex(searchIndex);
+    insertRowsNode.setSearchIndexFromWAL(searchIndex);
     return insertRowsNode;
   }
 
@@ -144,7 +144,7 @@ public class RelationalInsertRowsNode extends 
InsertRowsNode {
       RelationalInsertRowNode insertRowNode = 
RelationalInsertRowNode.subDeserializeFromWAL(buffer);
       insertRowsNode.addOneInsertRowNode(insertRowNode, i);
     }
-    insertRowsNode.setSearchIndex(searchIndex);
+    insertRowsNode.setSearchIndexFromWAL(searchIndex);
     return insertRowsNode;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
index 5a7e5d9cf78..7f7e5bc9458 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
@@ -28,6 +28,8 @@ import java.util.List;
 
 public abstract class SearchNode extends WritePlanNode implements 
ComparableConsensusRequest {
 
+  private static final long LAST_FRAGMENT_MASK = Long.MIN_VALUE;
+
   /** this insert node doesn't need to participate in iot consensus */
   public static final long NO_CONSENSUS_INDEX = 
ConsensusReqReader.DEFAULT_SEARCH_INDEX;
 
@@ -37,6 +39,8 @@ public abstract class SearchNode extends WritePlanNode 
implements ComparableCons
    */
   protected long searchIndex = NO_CONSENSUS_INDEX;
 
+  protected boolean isLastFragment = false;
+
   protected SearchNode(PlanNodeId id) {
     super(id);
   }
@@ -51,5 +55,38 @@ public abstract class SearchNode extends WritePlanNode 
implements ComparableCons
     return this;
   }
 
+  public boolean isLastFragment() {
+    return isLastFragment;
+  }
+
+  public SearchNode setLastFragment(boolean lastFragment) {
+    isLastFragment = lastFragment;
+    return this;
+  }
+
+  protected long getEncodedSearchIndex() {
+    if (searchIndex == NO_CONSENSUS_INDEX || !isLastFragment) {
+      return searchIndex;
+    }
+    return searchIndex | LAST_FRAGMENT_MASK;
+  }
+
+  public static long extractSearchIndex(long encodedSearchIndex) {
+    if (encodedSearchIndex == NO_CONSENSUS_INDEX) {
+      return encodedSearchIndex;
+    }
+    return encodedSearchIndex & ~LAST_FRAGMENT_MASK;
+  }
+
+  public static boolean isLastFragment(long encodedSearchIndex) {
+    return encodedSearchIndex != NO_CONSENSUS_INDEX
+        && (encodedSearchIndex & LAST_FRAGMENT_MASK) != 0;
+  }
+
+  protected void setSearchIndexFromWAL(long encodedSearchIndex) {
+    this.searchIndex = extractSearchIndex(encodedSearchIndex);
+    this.isLastFragment = isLastFragment(encodedSearchIndex);
+  }
+
   public abstract SearchNode merge(List<SearchNode> searchNodes);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index a6808b8d972..a0e1732189e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1194,6 +1194,7 @@ public class DataRegion implements IDataRegionForQuery {
       // init map
       long timePartitionId = 
TimePartitionUtils.getTimePartitionId(insertRowNode.getTime());
       initFlushTimeMap(timePartitionId);
+      insertRowNode.setLastFragment(true);
 
       boolean isSequence =
           config.isEnableSeparateData()
@@ -1322,14 +1323,29 @@ public class DataRegion implements IDataRegionForQuery {
       InsertTabletNode insertTabletNode,
       Map<Long, List<int[]>[]> splitMap,
       TSStatus[] results,
-      long[] infoForMetrics)
+      long[] infoForMetrics,
+      boolean markLastFragmentOnFinalWrite)
       throws DataTypeInconsistentException {
     boolean noFailure = true;
+    int remainingFragmentCount = 0;
+    if (markLastFragmentOnFinalWrite) {
+      for (Entry<Long, List<int[]>[]> entry : splitMap.entrySet()) {
+        List<int[]>[] rangeLists = entry.getValue();
+        if (rangeLists[1] != null) {
+          remainingFragmentCount++;
+        }
+        if (rangeLists[0] != null) {
+          remainingFragmentCount++;
+        }
+      }
+    }
     for (Entry<Long, List<int[]>[]> entry : splitMap.entrySet()) {
       long timePartitionId = entry.getKey();
       List<int[]>[] rangeLists = entry.getValue();
       List<int[]> sequenceRangeList = rangeLists[1];
       if (sequenceRangeList != null) {
+        insertTabletNode.setLastFragment(
+            markLastFragmentOnFinalWrite && remainingFragmentCount == 1);
         noFailure =
             insertTabletToTsFileProcessor(
                     insertTabletNode,
@@ -1340,9 +1356,12 @@ public class DataRegion implements IDataRegionForQuery {
                     noFailure,
                     infoForMetrics)
                 && noFailure;
+        remainingFragmentCount--;
       }
       List<int[]> unSequenceRangeList = rangeLists[0];
       if (unSequenceRangeList != null) {
+        insertTabletNode.setLastFragment(
+            markLastFragmentOnFinalWrite && remainingFragmentCount == 1);
         noFailure =
             insertTabletToTsFileProcessor(
                     insertTabletNode,
@@ -1353,6 +1372,7 @@ public class DataRegion implements IDataRegionForQuery {
                     noFailure,
                     infoForMetrics)
                 && noFailure;
+        remainingFragmentCount--;
       }
     }
     return noFailure;
@@ -1391,7 +1411,7 @@ public class DataRegion implements IDataRegionForQuery {
       // infoForMetrics[2]: ScheduleWalTimeCost
       // infoForMetrics[3]: ScheduleMemTableTimeCost
       // infoForMetrics[4]: InsertedPointsNumber
-      boolean noFailure = executeInsertTablet(insertTabletNode, results, 
infoForMetrics);
+      boolean noFailure = executeInsertTablet(insertTabletNode, results, 
infoForMetrics, true);
       updateTsFileProcessorMetric(insertTabletNode, infoForMetrics);
 
       if (!noFailure) {
@@ -1407,7 +1427,8 @@ public class DataRegion implements IDataRegionForQuery {
       InsertTabletNode insertTabletNode,
       TSStatus[] results,
       long[] infoForMetrics,
-      List<Pair<IDeviceID, Integer>> deviceEndOffsetPairs) {
+      List<Pair<IDeviceID, Integer>> deviceEndOffsetPairs,
+      boolean markLastFragmentOnFinalWrite) {
     final int initialStart = start;
     try {
       Map<Long, List<int[]>[]> splitInfo = new HashMap<>();
@@ -1416,7 +1437,8 @@ public class DataRegion implements IDataRegionForQuery {
         split(insertTabletNode, start, end, splitInfo);
         start = end;
       }
-      return doInsert(insertTabletNode, splitInfo, results, infoForMetrics);
+      return doInsert(
+          insertTabletNode, splitInfo, results, infoForMetrics, 
markLastFragmentOnFinalWrite);
     } catch (DataTypeInconsistentException e) {
       // the exception will trigger a flush, which requires the flush time to 
be recalculated
       start = initialStart;
@@ -1427,7 +1449,8 @@ public class DataRegion implements IDataRegionForQuery {
         start = end;
       }
       try {
-        return doInsert(insertTabletNode, splitInfo, results, infoForMetrics);
+        return doInsert(
+            insertTabletNode, splitInfo, results, infoForMetrics, 
markLastFragmentOnFinalWrite);
       } catch (DataTypeInconsistentException ex) {
         logger.error("Data inconsistent exception is not supposed to be 
triggered twice", ex);
         return false;
@@ -1436,7 +1459,10 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   private boolean executeInsertTablet(
-      InsertTabletNode insertTabletNode, TSStatus[] results, long[] 
infoForMetrics)
+      InsertTabletNode insertTabletNode,
+      TSStatus[] results,
+      long[] infoForMetrics,
+      boolean markLastFragmentOnFinalWrite)
       throws OutOfTTLException {
     boolean noFailure;
     int loc =
@@ -1447,7 +1473,13 @@ public class DataRegion implements IDataRegionForQuery {
     List<Pair<IDeviceID, Integer>> deviceEndOffsetPairs =
         insertTabletNode.splitByDevice(loc, insertTabletNode.getRowCount());
     noFailure =
-        splitAndInsert(loc, insertTabletNode, results, infoForMetrics, 
deviceEndOffsetPairs)
+        splitAndInsert(
+                loc,
+                insertTabletNode,
+                results,
+                infoForMetrics,
+                deviceEndOffsetPairs,
+                markLastFragmentOnFinalWrite)
             && noFailure;
 
     if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
@@ -1460,6 +1492,25 @@ public class DataRegion implements IDataRegionForQuery {
     return noFailure;
   }
 
+  private int findLastInsertTabletIndexToMark(final InsertMultiTabletsNode 
insertMultiTabletsNode) {
+    for (int i = insertMultiTabletsNode.getInsertTabletNodeList().size() - 1; 
i >= 0; i--) {
+      final InsertTabletNode insertTabletNode =
+          insertMultiTabletsNode.getInsertTabletNodeList().get(i);
+      if (insertTabletNode.getRowCount() <= 0 || 
insertTabletNode.allMeasurementFailed()) {
+        continue;
+      }
+      if (!insertTabletNode.shouldCheckTTL()) {
+        return i;
+      }
+      final long[] times = insertTabletNode.getTimes();
+      if (times.length > 0
+          && CommonUtils.isAlive(times[times.length - 1], 
getTTL(insertTabletNode))) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
   private void initFlushTimeMap(long timePartitionId) {
     if (config.isEnableSeparateData()
         && 
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId, true)) {
@@ -1741,7 +1792,10 @@ public class DataRegion implements IDataRegionForQuery {
     }
 
     List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
-    for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : 
tsFileProcessorMap.entrySet()) {
+    final List<Map.Entry<TsFileProcessor, InsertRowsNode>> groupedEntries =
+        new ArrayList<>(tsFileProcessorMap.entrySet());
+    markOnlyLastInsertFragment(groupedEntries, Map.Entry::getValue);
+    for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : groupedEntries) {
       InsertRowsNode subInsertRowsNode = entry.getValue();
       try {
         List<TsFileProcessor> insertedProcessors =
@@ -1760,6 +1814,13 @@ public class DataRegion implements IDataRegionForQuery {
     return executedInsertRowNodeList;
   }
 
+  private <T> void markOnlyLastInsertFragment(
+      final List<T> fragments, final java.util.function.Function<T, 
InsertNode> fragmentExtractor) {
+    for (int i = 0; i < fragments.size(); i++) {
+      fragmentExtractor.apply(fragments.get(i)).setLastFragment(i == 
fragments.size() - 1);
+    }
+  }
+
   private List<TsFileProcessor> insertRowsWithTypeConsistencyCheck(
       TsFileProcessor tsFileProcessor, InsertRowsNode subInsertRowsNode, 
long[] infoForMetrics)
       throws WriteProcessException {
@@ -1853,8 +1914,17 @@ public class DataRegion implements IDataRegionForQuery {
           });
     }
 
+    final List<Entry<TsFileProcessor, InsertRowsNode>> retriedEntries =
+        new ArrayList<>(retriedProcessorMap.entrySet());
+    for (int i = 0; i < retriedEntries.size(); i++) {
+      retriedEntries
+          .get(i)
+          .getValue()
+          .setLastFragment(subInsertRowsNode.isLastFragment() && i == 
retriedEntries.size() - 1);
+    }
+
     final List<TsFileProcessor> insertedProcessors = new 
ArrayList<>(retriedProcessorMap.size());
-    for (Entry<TsFileProcessor, InsertRowsNode> retriedEntry : 
retriedProcessorMap.entrySet()) {
+    for (Entry<TsFileProcessor, InsertRowsNode> retriedEntry : retriedEntries) 
{
       final TsFileProcessor retriedProcessor = retriedEntry.getKey();
       registerToTsFile(retriedEntry.getValue(), retriedProcessor);
       retriedProcessor.insertRows(retriedEntry.getValue(), infoForMetrics);
@@ -4587,7 +4657,10 @@ public class DataRegion implements IDataRegionForQuery {
       // infoForMetrics[2]: ScheduleWalTimeCost
       // infoForMetrics[3]: ScheduleMemTableTimeCost
       // infoForMetrics[4]: InsertedPointsNumber
-      for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : 
tsFileProcessorMap.entrySet()) {
+      final List<Map.Entry<TsFileProcessor, InsertRowsNode>> groupedEntries =
+          new ArrayList<>(tsFileProcessorMap.entrySet());
+      markOnlyLastInsertFragment(groupedEntries, Map.Entry::getValue);
+      for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : groupedEntries) {
         InsertRowsNode subInsertRowsNode = entry.getValue();
         try {
           List<TsFileProcessor> insertedProcessors =
@@ -4728,6 +4801,7 @@ public class DataRegion implements IDataRegionForQuery {
       // infoForMetrics[2]: ScheduleWalTimeCost
       // infoForMetrics[3]: ScheduleMemTableTimeCost
       // infoForMetrics[4]: InsertedPointsNumbe
+      final int lastTabletIndexToMark = 
findLastInsertTabletIndexToMark(insertMultiTabletsNode);
       for (int i = 0; i < 
insertMultiTabletsNode.getInsertTabletNodeList().size(); i++) {
         InsertTabletNode insertTabletNode = 
insertMultiTabletsNode.getInsertTabletNodeList().get(i);
         long[] times = insertTabletNode.getTimes();
@@ -4741,7 +4815,9 @@ public class DataRegion implements IDataRegionForQuery {
         Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
         boolean noFailure = false;
         try {
-          noFailure = executeInsertTablet(insertTabletNode, results, 
infoForMetrics);
+          noFailure =
+              executeInsertTablet(
+                  insertTabletNode, results, infoForMetrics, i == 
lastTabletIndexToMark);
         } catch (WriteProcessException e) {
           insertMultiTabletsNode
               .getResults()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index 98993c563f9..3a19909a6d6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -35,6 +35,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNo
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
 import org.apache.iotdb.db.service.metrics.WritingMetrics;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
@@ -687,7 +688,9 @@ public class WALNode implements IWALNode {
             if (type.needSearch()) {
               // see WALInfoEntry#serialize, entry type + memtable id + plan 
node type
               buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + 
PlanNodeType.BYTES);
-              final long currentWalEntryIndex = buffer.getLong();
+              final long encodedSearchIndex = buffer.getLong();
+              final long currentWalEntryIndex = 
SearchNode.extractSearchIndex(encodedSearchIndex);
+              final boolean isLastFragment = 
SearchNode.isLastFragment(encodedSearchIndex);
               buffer.clear();
               if (currentWalEntryIndex == -1) {
                 // WAL entry of targetIndex has been fully collected, so put 
them into insertNodes
@@ -712,6 +715,9 @@ public class WALNode implements IWALNode {
                   tmpNodes.get().add(new IoTConsensusRequest(buffer));
                   memorySize += buffer.remaining();
                 }
+                if (isLastFragment) {
+                  tryToCollectInsertNodeAndBumpIndex.run();
+                }
               } else {
                 // currentWalEntryIndex > targetIndex
                 // WAL entry of targetIndex has been fully collected, put them 
into insertNodes
@@ -740,6 +746,9 @@ public class WALNode implements IWALNode {
                   tmpNodes.get().add(new IoTConsensusRequest(buffer));
                   memorySize += buffer.remaining();
                 }
+                if (isLastFragment) {
+                  tryToCollectInsertNodeAndBumpIndex.run();
+                }
               }
             } else {
               tryToCollectInsertNodeAndBumpIndex.run();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java
index dbe69da2462..21bc9e0d120 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java
@@ -23,10 +23,13 @@ import 
org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALByteBufferForTest;
 
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.Assert;
 import org.junit.Test;
@@ -35,6 +38,7 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 
 public class InsertRowNodeSerdeTest {
 
@@ -72,6 +76,7 @@ public class InsertRowNodeSerdeTest {
   @Test
   public void TestSerializeAndDeserializeForWAL() throws IllegalPathException, 
IOException {
     InsertRowNode insertRowNode = getInsertRowNodeWithMeasurementSchemas();
+    insertRowNode.setLastFragment(true);
 
     int serializedSize = insertRowNode.serializedSize();
 
@@ -96,6 +101,67 @@ public class InsertRowNodeSerdeTest {
           new MeasurementSchema("s5", TSDataType.BOOLEAN)
         });
     Assert.assertEquals(insertRowNode, tmpNode);
+    Assert.assertTrue(tmpNode.isLastFragment());
+  }
+
+  @Test
+  public void testDeserializeLegacyWAL() throws IllegalPathException, 
IOException {
+    InsertRowNode insertRowNode = getInsertRowNodeWithMeasurementSchemas();
+    insertRowNode.setSearchIndex(123L);
+
+    byte[] bytes = new byte[insertRowNode.serializedSize()];
+    WALByteBufferForTest walBuffer = new 
WALByteBufferForTest(ByteBuffer.wrap(bytes));
+    insertRowNode.serializeToWAL(walBuffer);
+
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    Assert.assertEquals(PlanNodeType.INSERT_ROW.getNodeType(), 
byteBuffer.getShort());
+    Assert.assertEquals(123L, byteBuffer.getLong());
+
+    DataInputStream dataInputStream = new DataInputStream(new 
ByteArrayInputStream(bytes));
+    dataInputStream.readShort();
+
+    InsertRowNode tmpNode = InsertRowNode.deserializeFromWAL(dataInputStream);
+    tmpNode.setPlanNodeId(insertRowNode.getPlanNodeId());
+    tmpNode.setMeasurementSchemas(
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.DOUBLE),
+          new MeasurementSchema("s2", TSDataType.FLOAT),
+          new MeasurementSchema("s3", TSDataType.INT64),
+          new MeasurementSchema("s4", TSDataType.INT32),
+          new MeasurementSchema("s5", TSDataType.BOOLEAN)
+        });
+    Assert.assertEquals(insertRowNode, tmpNode);
+    Assert.assertEquals(123L, tmpNode.getSearchIndex());
+    Assert.assertFalse(tmpNode.isLastFragment());
+  }
+
+  @Test
+  public void testDeserializeLegacyWALRelational() throws IOException {
+    RelationalInsertRowNode insertRowNode = 
getRelationalInsertRowNodeWithMeasurementSchemas();
+    insertRowNode.setSearchIndex(123L);
+
+    byte[] bytes = new byte[insertRowNode.serializedSize()];
+    WALByteBufferForTest walBuffer = new 
WALByteBufferForTest(ByteBuffer.wrap(bytes));
+    insertRowNode.serializeToWAL(walBuffer);
+
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    Assert.assertEquals(PlanNodeType.RELATIONAL_INSERT_ROW.getNodeType(), 
byteBuffer.getShort());
+    Assert.assertEquals(123L, byteBuffer.getLong());
+
+    DataInputStream dataInputStream = new DataInputStream(new 
ByteArrayInputStream(bytes));
+    dataInputStream.readShort();
+
+    RelationalInsertRowNode tmpNode = 
RelationalInsertRowNode.deserializeFromWAL(dataInputStream);
+    tmpNode.setPlanNodeId(insertRowNode.getPlanNodeId());
+    tmpNode.setMeasurementSchemas(
+        new MeasurementSchema[] {
+          new MeasurementSchema("id", TSDataType.STRING),
+          new MeasurementSchema("attr", TSDataType.TEXT),
+          new MeasurementSchema("value", TSDataType.INT64)
+        });
+    Assert.assertEquals(insertRowNode, tmpNode);
+    Assert.assertEquals(123L, tmpNode.getSearchIndex());
+    Assert.assertFalse(tmpNode.isLastFragment());
   }
 
   private InsertRowNode getInsertRowNode() throws IllegalPathException {
@@ -199,4 +265,28 @@ public class InsertRowNodeSerdeTest {
     insertRowNode.setNeedInferType(true);
     return insertRowNode;
   }
+
+  private RelationalInsertRowNode 
getRelationalInsertRowNodeWithMeasurementSchemas() {
+    return new RelationalInsertRowNode(
+        new PlanNodeId("plannode 3"),
+        new PartialPath("table1", false),
+        false,
+        new String[] {"id", "attr", "value"},
+        new TSDataType[] {TSDataType.STRING, TSDataType.TEXT, 
TSDataType.INT64},
+        new MeasurementSchema[] {
+          new MeasurementSchema("id", TSDataType.STRING),
+          new MeasurementSchema("attr", TSDataType.TEXT),
+          new MeasurementSchema("value", TSDataType.INT64)
+        },
+        90L,
+        new Object[] {
+          new Binary("d1".getBytes(StandardCharsets.UTF_8)),
+          new Binary("v1".getBytes(StandardCharsets.UTF_8)),
+          1L
+        },
+        false,
+        new TsTableColumnCategory[] {
+          TsTableColumnCategory.TAG, TsTableColumnCategory.ATTRIBUTE, 
TsTableColumnCategory.FIELD
+        });
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsNodeSerdeTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsNodeSerdeTest.java
index 08819830557..034d4cb57de 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsNodeSerdeTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsNodeSerdeTest.java
@@ -129,6 +129,7 @@ public class InsertRowsNodeSerdeTest {
             new Object[] {2.0, false},
             false),
         1);
+    insertRowsNode.setLastFragment(true);
 
     int serializedSize = insertRowsNode.serializedSize();
 
@@ -145,6 +146,70 @@ public class InsertRowsNodeSerdeTest {
     InsertRowsNode tmpNode = 
InsertRowsNode.deserializeFromWAL(dataInputStream);
     tmpNode.setPlanNodeId(insertRowsNode.getPlanNodeId());
     Assert.assertEquals(insertRowsNode, tmpNode);
+    Assert.assertTrue(tmpNode.isLastFragment());
+  }
+
+  @Test
+  public void testDeserializeLegacyWAL() throws IllegalPathException, 
IOException {
+    InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId("plan 
node 1"));
+    insertRowsNode.addOneInsertRowNode(
+        new InsertRowNode(
+            new PlanNodeId(""),
+            new PartialPath("root.sg.d1"),
+            false,
+            new String[] {"s1", "s2", "s3", "s4", "s5"},
+            new TSDataType[] {
+              TSDataType.DOUBLE,
+              TSDataType.FLOAT,
+              TSDataType.INT64,
+              TSDataType.TEXT,
+              TSDataType.STRING
+            },
+            new MeasurementSchema[] {
+              new MeasurementSchema("s1", TSDataType.DOUBLE),
+              new MeasurementSchema("s2", TSDataType.FLOAT),
+              new MeasurementSchema("s3", TSDataType.INT64),
+              new MeasurementSchema("s4", TSDataType.TEXT),
+              new MeasurementSchema("s5", TSDataType.STRING)
+            },
+            1000L,
+            new Object[] {1.0, 2f, 300L, new 
Binary("444".getBytes(StandardCharsets.UTF_8)), null},
+            false),
+        0);
+
+    insertRowsNode.addOneInsertRowNode(
+        new InsertRowNode(
+            new PlanNodeId(""),
+            new PartialPath("root.sg.d2"),
+            false,
+            new String[] {"s1", "s4"},
+            new TSDataType[] {TSDataType.DOUBLE, TSDataType.BOOLEAN},
+            new MeasurementSchema[] {
+              new MeasurementSchema("s1", TSDataType.DOUBLE),
+              new MeasurementSchema("s4", TSDataType.BOOLEAN),
+            },
+            2000L,
+            new Object[] {2.0, false},
+            false),
+        1);
+    insertRowsNode.setSearchIndex(123L);
+
+    byte[] bytes = new byte[insertRowsNode.serializedSize()];
+    WALByteBufferForTest walBuffer = new 
WALByteBufferForTest(ByteBuffer.wrap(bytes));
+    insertRowsNode.serializeToWAL(walBuffer);
+
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    Assert.assertEquals(PlanNodeType.INSERT_ROWS.getNodeType(), 
byteBuffer.getShort());
+    Assert.assertEquals(123L, byteBuffer.getLong());
+
+    DataInputStream dataInputStream = new DataInputStream(new 
ByteArrayInputStream(bytes));
+    dataInputStream.readShort();
+
+    InsertRowsNode tmpNode = 
InsertRowsNode.deserializeFromWAL(dataInputStream);
+    tmpNode.setPlanNodeId(insertRowsNode.getPlanNodeId());
+    Assert.assertEquals(insertRowsNode, tmpNode);
+    Assert.assertEquals(123L, tmpNode.getSearchIndex());
+    Assert.assertFalse(tmpNode.isLastFragment());
   }
 
   @Test
@@ -262,6 +327,7 @@ public class InsertRowsNodeSerdeTest {
                 TsTableColumnCategory.TAG, TsTableColumnCategory.ATTRIBUTE
               }),
           1);
+      insertRowsNode.setLastFragment(true);
 
       int serializedSize = insertRowsNode.serializedSize();
 
@@ -280,6 +346,81 @@ public class InsertRowsNodeSerdeTest {
           RelationalInsertRowsNode.deserializeFromWAL(dataInputStream);
       tmpNode.setPlanNodeId(insertRowsNode.getPlanNodeId());
       Assert.assertEquals(insertRowsNode, tmpNode);
+      Assert.assertTrue(tmpNode.isLastFragment());
     }
   }
+
+  @Test
+  public void testDeserializeLegacyWALRelational() throws IOException {
+    RelationalInsertRowsNode insertRowsNode =
+        new RelationalInsertRowsNode(new PlanNodeId("plan node 1"));
+    insertRowsNode.addOneInsertRowNode(
+        new RelationalInsertRowNode(
+            new PlanNodeId(""),
+            new PartialPath("table1", false),
+            false,
+            new String[] {"s1", "s2", "s3", "s4", "s5"},
+            new TSDataType[] {
+              TSDataType.DOUBLE,
+              TSDataType.FLOAT,
+              TSDataType.INT64,
+              TSDataType.TEXT,
+              TSDataType.STRING
+            },
+            new MeasurementSchema[] {
+              new MeasurementSchema("s1", TSDataType.DOUBLE),
+              new MeasurementSchema("s2", TSDataType.FLOAT),
+              new MeasurementSchema("s3", TSDataType.INT64),
+              new MeasurementSchema("s4", TSDataType.TEXT),
+              new MeasurementSchema("s5", TSDataType.STRING)
+            },
+            1000L,
+            new Object[] {1.0, 2f, 300L, new 
Binary("444".getBytes(StandardCharsets.UTF_8)), null},
+            false,
+            new TsTableColumnCategory[] {
+              TsTableColumnCategory.TAG,
+              TsTableColumnCategory.ATTRIBUTE,
+              TsTableColumnCategory.FIELD,
+              TsTableColumnCategory.FIELD,
+              TsTableColumnCategory.FIELD
+            }),
+        0);
+
+    insertRowsNode.addOneInsertRowNode(
+        new RelationalInsertRowNode(
+            new PlanNodeId(""),
+            new PartialPath("table1", false),
+            false,
+            new String[] {"s1", "s4"},
+            new TSDataType[] {TSDataType.DOUBLE, TSDataType.BOOLEAN},
+            new MeasurementSchema[] {
+              new MeasurementSchema("s1", TSDataType.DOUBLE),
+              new MeasurementSchema("s4", TSDataType.BOOLEAN),
+            },
+            2000L,
+            new Object[] {2.0, false},
+            false,
+            new TsTableColumnCategory[] {
+              TsTableColumnCategory.TAG, TsTableColumnCategory.ATTRIBUTE
+            }),
+        1);
+    insertRowsNode.setSearchIndex(123L);
+
+    byte[] bytes = new byte[insertRowsNode.serializedSize()];
+    WALByteBufferForTest walBuffer = new 
WALByteBufferForTest(ByteBuffer.wrap(bytes));
+    insertRowsNode.serializeToWAL(walBuffer);
+
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    Assert.assertEquals(PlanNodeType.RELATIONAL_INSERT_ROWS.getNodeType(), 
byteBuffer.getShort());
+    Assert.assertEquals(123L, byteBuffer.getLong());
+
+    DataInputStream dataInputStream = new DataInputStream(new 
ByteArrayInputStream(bytes));
+    dataInputStream.readShort();
+
+    RelationalInsertRowsNode tmpNode = 
RelationalInsertRowsNode.deserializeFromWAL(dataInputStream);
+    tmpNode.setPlanNodeId(insertRowsNode.getPlanNodeId());
+    Assert.assertEquals(insertRowsNode, tmpNode);
+    Assert.assertEquals(123L, tmpNode.getSearchIndex());
+    Assert.assertFalse(tmpNode.isLastFragment());
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java
index 4d8f5f6e4a8..ddc35e1eda9 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java
@@ -68,6 +68,7 @@ public class InsertTabletNodeSerdeTest {
   @Test
   public void testSerializeAndDeserializeForWAL() throws IllegalPathException, 
IOException {
     InsertTabletNode insertTabletNode = getInsertTabletNodeWithSchema();
+    insertTabletNode.setLastFragment(true);
 
     int serializedSize = insertTabletNode.serializedSize();
 
@@ -93,6 +94,38 @@ public class InsertTabletNodeSerdeTest {
           new MeasurementSchema("s5", TSDataType.BOOLEAN)
         });
     Assert.assertEquals(insertTabletNode, tmpNode);
+    Assert.assertTrue(tmpNode.isLastFragment());
+  }
+
+  @Test
+  public void testDeserializeLegacyWAL() throws IllegalPathException, 
IOException {
+    InsertTabletNode insertTabletNode = getInsertTabletNodeWithSchema();
+    insertTabletNode.setSearchIndex(123L);
+
+    byte[] bytes = new byte[insertTabletNode.serializedSize()];
+    WALByteBufferForTest walBuffer = new 
WALByteBufferForTest(ByteBuffer.wrap(bytes));
+    insertTabletNode.serializeToWAL(walBuffer);
+
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    Assert.assertEquals(PlanNodeType.INSERT_TABLET.getNodeType(), 
byteBuffer.getShort());
+    Assert.assertEquals(123L, byteBuffer.getLong());
+
+    DataInputStream dataInputStream = new DataInputStream(new 
ByteArrayInputStream(bytes));
+    dataInputStream.readShort();
+
+    InsertTabletNode tmpNode = 
InsertTabletNode.deserializeFromWAL(dataInputStream);
+    tmpNode.setPlanNodeId(insertTabletNode.getPlanNodeId());
+    tmpNode.setMeasurementSchemas(
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.DOUBLE),
+          new MeasurementSchema("s2", TSDataType.FLOAT),
+          new MeasurementSchema("s3", TSDataType.INT64),
+          new MeasurementSchema("s4", TSDataType.INT32),
+          new MeasurementSchema("s5", TSDataType.BOOLEAN)
+        });
+    Assert.assertEquals(insertTabletNode, tmpNode);
+    Assert.assertEquals(123L, tmpNode.getSearchIndex());
+    Assert.assertFalse(tmpNode.isLastFragment());
   }
 
   @Test
@@ -126,6 +159,7 @@ public class InsertTabletNodeSerdeTest {
     for (String tableName : new String[] {"table1", "ta`ble1", "root.table1"}) 
{
       RelationalInsertTabletNode insertTabletNode =
           getRelationalInsertTabletNodeWithSchema(tableName);
+      insertTabletNode.setLastFragment(true);
 
       int serializedSize = insertTabletNode.serializedSize();
 
@@ -153,9 +187,42 @@ public class InsertTabletNodeSerdeTest {
             new MeasurementSchema("s5", TSDataType.BOOLEAN)
           });
       Assert.assertEquals(insertTabletNode, tmpNode);
+      Assert.assertTrue(tmpNode.isLastFragment());
     }
   }
 
+  @Test
+  public void testDeserializeLegacyWALRelational() throws IOException {
+    RelationalInsertTabletNode insertTabletNode = 
getRelationalInsertTabletNodeWithSchema("table1");
+    insertTabletNode.setSearchIndex(123L);
+
+    byte[] bytes = new byte[insertTabletNode.serializedSize()];
+    WALByteBufferForTest walBuffer = new 
WALByteBufferForTest(ByteBuffer.wrap(bytes));
+    insertTabletNode.serializeToWAL(walBuffer);
+
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    Assert.assertEquals(PlanNodeType.RELATIONAL_INSERT_TABLET.getNodeType(), 
byteBuffer.getShort());
+    Assert.assertEquals(123L, byteBuffer.getLong());
+
+    DataInputStream dataInputStream = new DataInputStream(new 
ByteArrayInputStream(bytes));
+    dataInputStream.readShort();
+
+    RelationalInsertTabletNode tmpNode =
+        RelationalInsertTabletNode.deserializeFromWAL(dataInputStream);
+    tmpNode.setPlanNodeId(insertTabletNode.getPlanNodeId());
+    tmpNode.setMeasurementSchemas(
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.DOUBLE),
+          new MeasurementSchema("s2", TSDataType.FLOAT),
+          new MeasurementSchema("s3", TSDataType.INT64),
+          new MeasurementSchema("s4", TSDataType.INT32),
+          new MeasurementSchema("s5", TSDataType.BOOLEAN)
+        });
+    Assert.assertEquals(insertTabletNode, tmpNode);
+    Assert.assertEquals(123L, tmpNode.getSearchIndex());
+    Assert.assertFalse(tmpNode.isLastFragment());
+  }
+
   @Test
   public void testInitTabletValuesWithAllTypes()
       throws NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java
index 2977841d5e6..c204a839b51 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.node;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
 import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -178,6 +177,29 @@ public class WALNodeWaitForRollFileTest {
     assertNotNull(iterator.next());
   }
 
+  @Test
+  public void testLegacySeparatorStillWorksAfterRollFile() throws Exception {
+    IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode.onMemTableCreated(memTable, logDirectory + File.separator + 
"test.tsfile");
+
+    InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new 
long[] {1});
+    insertTabletNode.setSearchIndex(1);
+    walNode.log(
+        memTable.getMemTableId(),
+        insertTabletNode,
+        Collections.singletonList(new int[] {0, 
insertTabletNode.getRowCount()}));
+    walNode.log(memTable.getMemTableId(), new 
ContinuousSameSearchIndexSeparatorNode());
+
+    Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
walNode.isAllWALEntriesConsumed());
+
+    walNode.rollWALFile();
+    Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
walNode.isAllWALEntriesConsumed());
+
+    ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
+    assertTrue(iterator.hasNext());
+    assertNotNull(iterator.next());
+  }
+
   /**
    * Verifies that waitForNextReady wakes up when a WAL file roll is triggered 
concurrently. A
    * background thread rolls the WAL file while the main thread waits on the 
iterator.
@@ -190,12 +212,11 @@ public class WALNodeWaitForRollFileTest {
     // write data with search index
     InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new 
long[] {1});
     insertTabletNode.setSearchIndex(1);
+    insertTabletNode.setLastFragment(true);
     walNode.log(
         memTable.getMemTableId(),
         insertTabletNode,
         Collections.singletonList(new int[] {0, 
insertTabletNode.getRowCount()}));
-    walNode.log(
-        memTable.getMemTableId(), new 
ContinuousSameSearchIndexSeparatorNode(new PlanNodeId("")));
 
     Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
walNode.isAllWALEntriesConsumed());
 
@@ -318,12 +339,11 @@ public class WALNodeWaitForRollFileTest {
     // write data with search index — stays in the current (active) WAL file
     InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new 
long[] {1});
     insertTabletNode.setSearchIndex(1);
+    insertTabletNode.setLastFragment(true);
     walNode.log(
         memTable.getMemTableId(),
         insertTabletNode,
         Collections.singletonList(new int[] {0, 
insertTabletNode.getRowCount()}));
-    walNode.log(
-        memTable.getMemTableId(), new 
ContinuousSameSearchIndexSeparatorNode(new PlanNodeId("")));
 
     Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
walNode.isAllWALEntriesConsumed());
 

Reply via email to