This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch batch_wal
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 82ba91908023c54f6083d73f76dfbb0f2b3f4d7a
Author: HTHou <[email protected]>
AuthorDate: Mon Apr 1 15:31:07 2024 +0800

    finishing
---
 .../plan/planner/plan/node/PlanNodeType.java       |  4 +
 .../planner/plan/node/write/InsertRowsNode.java    | 75 ++++++++++++++++++-
 .../db/storageengine/dataregion/DataRegion.java    | 26 ++++---
 .../dataregion/memtable/TsFileProcessor.java       | 86 ++++++++++++++++++++++
 .../dataregion/wal/buffer/WALEntryType.java        |  2 +
 .../dataregion/wal/buffer/WALInfoEntry.java        |  1 +
 .../dataregion/wal/node/IWALNode.java              |  4 +
 .../dataregion/wal/node/WALFakeNode.java           |  6 ++
 .../storageengine/dataregion/wal/node/WALNode.java |  7 ++
 .../wal/recover/file/TsFilePlanRedoer.java         | 25 +++++++
 .../file/UnsealedTsFileRecoverPerformer.java       |  4 +
 11 files changed, 230 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index bda66843a97..d76d4cdfb05 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -238,6 +238,8 @@ public enum PlanNodeType {
         return InsertTabletNode.deserializeFromWAL(stream);
       case 14:
         return InsertRowNode.deserializeFromWAL(stream);
+      case 15:
+        return InsertRowsNode.deserializeFromWAL(stream);
       case 44:
         return DeleteDataNode.deserializeFromWAL(stream);
       default:
@@ -252,6 +254,8 @@ public enum PlanNodeType {
         return InsertTabletNode.deserializeFromWAL(buffer);
       case 14:
         return InsertRowNode.deserializeFromWAL(buffer);
+      case 15:
+        return InsertRowsNode.deserializeFromWAL(buffer);
       case 44:
         return DeleteDataNode.deserializeFromWAL(buffer);
       default:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 12e229470be..77c2de63d8e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -30,9 +30,12 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -43,7 +46,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-public class InsertRowsNode extends InsertNode {
+public class InsertRowsNode extends InsertNode implements WALEntryValue {
 
   /**
    * Suppose there is an InsertRowsNode, which contains 5 InsertRowNodes,
@@ -265,4 +268,74 @@ public class InsertRowsNode extends InsertNode {
     this.progressIndex = progressIndex;
     insertRowNodeList.forEach(insertRowNode -> 
insertRowNode.setProgressIndex(progressIndex));
   }
+
+  // region serialize & deserialize methods for WAL
+  /** Serialized size for wal. */
+  @Override
+  public int serializedSize() {
+    return Short.BYTES + subSerializeSize();
+  }
+
+  private int subSerializeSize() {
+    int size = Integer.BYTES;
+    for (InsertRowNode insertRowNode : insertRowNodeList) {
+      size += insertRowNode.serializedSize();
+    }
+    return size;
+  }
+
+  /**
+   * Compared with {@link this#serialize(ByteBuffer)}, more info: search 
index, less info:
+   * isNeedInferType
+   */
+  @Override
+  public void serializeToWAL(IWALByteBufferView buffer) {
+    buffer.putShort(PlanNodeType.INSERT_ROWS.getNodeType());
+    subSerialize(buffer);
+  }
+
+  private void subSerialize(IWALByteBufferView buffer) {
+    buffer.putInt(insertRowNodeList.size());
+    for (InsertRowNode insertRowNode : insertRowNodeList) {
+      insertRowNode.serializeToWAL(buffer);
+    }
+  }
+
+  /**
+   * Deserialize from wal.
+   *
+   * @param stream - DataInputStream
+   * @return InsertRowNode
+   * @throws IOException - If an I/O error occurs.
+   * @throws IllegalArgumentException - If meets illegal argument.
+   */
+  public static InsertRowsNode deserializeFromWAL(DataInputStream stream) 
throws IOException {
+    // we do not store plan node id in wal entry
+    InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+    int listSize = stream.readInt();
+    for (int i = 0; i < listSize; i++) {
+      InsertRowNode insertRowNode = InsertRowNode.deserializeFromWAL(stream);
+      insertRowsNode.addOneInsertRowNode(insertRowNode, i);
+    }
+    return insertRowsNode;
+  }
+
+  /**
+   * Deserialize from wal.
+   *
+   * @param buffer - ByteBuffer
+   * @return InsertRowNode
+   * @throws IllegalArgumentException - If meets illegal argument
+   */
+  public static InsertRowsNode deserializeFromWAL(ByteBuffer buffer) {
+    // we do not store plan node id in wal entry
+    InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId(""));
+    int listSize = buffer.getInt();
+    for (int i = 0; i < listSize; i++) {
+      InsertRowNode insertRowNode = InsertRowNode.deserializeFromWAL(buffer);
+      insertRowsNode.addOneInsertRowNode(insertRowNode, i);
+    }
+    return insertRowsNode;
+  }
+  // endregion
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 2a5602e315e..98e92d7f2fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1181,14 +1181,18 @@ public class DataRegion implements IDataRegionForQuery {
       if (tsFileProcessor == null) {
         continue;
       }
-      tsFileProcessorMap.compute(tsFileProcessor, (k, v) -> {
-        if (v == null) {
-          v = new InsertRowsNode(insertRowsNode.getPlanNodeId());
-          v.setSearchIndex(insertRowNode.getSearchIndex());
-        }
-        v.addOneInsertRowNode(insertRowNode, v.getInsertRowNodeList().size());
-        return v;
-      });
+      int finalI = i;
+      tsFileProcessorMap.compute(
+          tsFileProcessor,
+          (k, v) -> {
+            if (v == null) {
+              v = new InsertRowsNode(insertRowsNode.getPlanNodeId());
+              v.setSearchIndex(insertRowNode.getSearchIndex());
+              v.setAligned(insertRowNode.isAligned());
+            }
+            v.addOneInsertRowNode(insertRowNode, finalI);
+            return v;
+          });
       try {
         tsFileProcessor.insert(insertRowNode, costsForMetrics);
       } catch (WriteProcessException e) {
@@ -1204,7 +1208,11 @@ public class DataRegion implements IDataRegionForQuery {
       try {
         tsFileProcessor.insert(subInsertRowsNode, costsForMetrics);
       } catch (WriteProcessException e) {
-        insertRowsNode.getResults().put(i, 
RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+        insertRowsNode
+            .getResults()
+            .put(
+                subInsertRowsNode.getInsertRowNodeIndexList().get(0),
+                RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
       }
 
       if (entry.getKey().shouldFlush()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 71d93647a0d..e42249dae17 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -38,6 +38,7 @@ import 
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
 import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
 import org.apache.iotdb.db.service.metrics.WritingMetrics;
@@ -317,6 +318,91 @@ public class TsFileProcessor {
     costsForMetrics[3] += System.nanoTime() - startTime;
   }
 
+  public void insert(InsertRowsNode insertRowsNode, long[] costsForMetrics)
+      throws WriteProcessException {
+
+    if (workMemTable == null) {
+      long startTime = System.nanoTime();
+      createNewWorkingMemTable();
+      // recordCreateMemtableBlockCost
+      costsForMetrics[0] += System.nanoTime() - startTime;
+      WritingMetrics.getInstance()
+          
.recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
+    }
+
+    long[] memIncrements = null;
+
+    long memControlStartTime = System.nanoTime();
+    if (insertRowsNode.isAligned()) {
+      for (InsertRowNode insertRowNode : 
insertRowsNode.getInsertRowNodeList()) {
+        memIncrements =
+            checkAlignedMemCostAndAddToTspInfo(
+                insertRowNode.getDeviceID(), insertRowNode.getMeasurements(),
+                insertRowNode.getDataTypes(), insertRowNode.getValues());
+      }
+    } else {
+      for (InsertRowNode insertRowNode : 
insertRowsNode.getInsertRowNodeList()) {
+        memIncrements =
+            checkMemCostAndAddToTspInfo(
+                insertRowNode.getDeviceID(), insertRowNode.getMeasurements(),
+                insertRowNode.getDataTypes(), insertRowNode.getValues());
+      }
+    }
+    // recordScheduleMemoryBlockCost
+    costsForMetrics[1] += System.nanoTime() - memControlStartTime;
+
+    long startTime = System.nanoTime();
+    WALFlushListener walFlushListener;
+    try {
+      walFlushListener = walNode.log(workMemTable.getMemTableId(), 
insertRowsNode);
+      if (walFlushListener.waitForResult() == 
AbstractResultListener.Status.FAILURE) {
+        throw walFlushListener.getCause();
+      }
+    } catch (Exception e) {
+      rollbackMemoryInfo(memIncrements);
+      throw new WriteProcessException(
+          String.format(
+              "%s: %s write WAL failed",
+              storageGroupName, tsFileResource.getTsFile().getAbsolutePath()),
+          e);
+    } finally {
+      // recordScheduleWalCost
+      costsForMetrics[2] += System.nanoTime() - startTime;
+    }
+
+    startTime = System.nanoTime();
+    for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
+      PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(insertRowNode);
+      if (!insertRowNode.isGeneratedByPipe()) {
+        workMemTable.markAsNotGeneratedByPipe();
+      }
+      PipeInsertionDataNodeListener.getInstance()
+          .listenToInsertNode(
+              dataRegionInfo.getDataRegion().getDataRegionId(),
+              walFlushListener.getWalEntryHandler(),
+              insertRowNode,
+              tsFileResource);
+
+      if (insertRowNode.isAligned()) {
+        workMemTable.insertAlignedRow(insertRowNode);
+      } else {
+        workMemTable.insert(insertRowNode);
+      }
+
+      // update start time of this memtable
+      tsFileResource.updateStartTime(insertRowNode.getDeviceID(), 
insertRowNode.getTime());
+      // for sequence tsfile, we update the endTime only when the file is 
prepared to be closed.
+      // for unsequence tsfile, we have to update the endTime for each 
insertion.
+      if (!sequence) {
+        tsFileResource.updateEndTime(insertRowNode.getDeviceID(), 
insertRowNode.getTime());
+      }
+
+      tsFileResource.updateProgressIndex(insertRowNode.getProgressIndex());
+    }
+    // recordScheduleMemTableCost
+    costsForMetrics[3] += System.nanoTime() - startTime;
+  }
+
   private void createNewWorkingMemTable() {
     workMemTable =
         MemTableManager.getInstance()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
index defaf07ffd6..c304d605695 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
@@ -38,6 +38,8 @@ public enum WALEntryType {
   DELETE_DATA_NODE((byte) 6),
   /** {@link 
org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint} */
   MEMORY_TABLE_CHECKPOINT((byte) 7),
+  /** {@link 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode} */
+  INSERT_ROWS_NODE((byte) 8),
   // endregion
   // region signal entry type
   // signal wal buffer has been closed
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java
index e8fe5633f83..966361857f5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java
@@ -73,6 +73,7 @@ public class WALInfoEntry extends WALEntry {
             .serializeToWAL(buffer, tabletInfo.tabletStart, 
tabletInfo.tabletEnd);
         break;
       case INSERT_ROW_NODE:
+      case INSERT_ROWS_NODE:
       case DELETE_DATA_NODE:
       case MEMORY_TABLE_SNAPSHOT:
         value.serializeToWAL(buffer);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java
index 251d2bdbcde..7e10c3a70c6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.storageengine.dataregion.flush.FlushListener;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
@@ -34,6 +35,9 @@ public interface IWALNode extends FlushListener, 
AutoCloseable, ConsensusReqRead
   /** Log InsertRowNode. */
   WALFlushListener log(long memTableId, InsertRowNode insertRowNode);
 
+  /** Log InsertRowsNode. */
+  WALFlushListener log(long memTableId, InsertRowsNode insertRowsNode);
+
   /** Log InsertTabletNode. */
   WALFlushListener log(long memTableId, InsertTabletNode insertTabletNode, int 
start, int end);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java
index 9c2f926c930..2902a6dcae9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.node;
 
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
 import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALException;
@@ -50,6 +51,11 @@ public class WALFakeNode implements IWALNode {
     return getResult();
   }
 
+  @Override
+  public WALFlushListener log(long memTableId, InsertRowsNode insertRowsNode) {
+    return getResult();
+  }
+
   @Override
   public WALFlushListener log(
       long memTableId, InsertTabletNode insertTabletNode, int start, int end) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index c89f2d8cb9c..57003b957af 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.service.metrics.WritingMetrics;
 import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -130,6 +131,12 @@ public class WALNode implements IWALNode {
     return log(walEntry);
   }
 
+  @Override
+  public WALFlushListener log(long memTableId, InsertRowsNode insertRowsNode) {
+    WALEntry walEntry = new WALInfoEntry(memTableId, insertRowsNode);
+    return log(walEntry);
+  }
+
   @Override
   public WALFlushListener log(
       long memTableId, InsertTabletNode insertTabletNode, int start, int end) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
index 2ecb64e28d2..67addc2b5ad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.exception.WriteProcessException;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.service.metrics.WritingMetrics;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
@@ -111,6 +112,30 @@ public class TsFilePlanRedoer {
     }
   }
 
+  void redoInsertRows(InsertRowsNode insertRowsNode) {
+    for (InsertRowNode node : insertRowsNode.getInsertRowNodeList()) {
+      if (!node.hasValidMeasurements()) {
+        continue;
+      }
+      if (tsFileResource != null) {
+        // orders of insert node is guaranteed by storage engine, just check 
time in the file
+        // the last chunk group may contain the same data with the logs, 
ignore such logs in seq
+        // file
+        long lastEndTime = tsFileResource.getEndTime(node.getDeviceID());
+        long minTimeInNode;
+        minTimeInNode = node.getTime();
+        if (lastEndTime != Long.MIN_VALUE && lastEndTime >= minTimeInNode) {
+          return;
+        }
+      }
+      if (node.isAligned()) {
+        recoveryMemTable.insertAlignedRow(node);
+      } else {
+        recoveryMemTable.insert(node);
+      }
+    }
+  }
+
   void resetRecoveryMemTable(IMemTable memTable) {
     this.recoveryMemTable = memTable;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
index d2b3fd07d9a..4d39dd30dfa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk;
@@ -197,6 +198,9 @@ public class UnsealedTsFileRecoverPerformer extends 
AbstractTsFileRecoverPerform
         case INSERT_TABLET_NODE:
           walRedoer.redoInsert((InsertNode) walEntry.getValue());
           break;
+        case INSERT_ROWS_NODE:
+          walRedoer.redoInsertRows((InsertRowsNode) walEntry.getValue());
+          break;
         case DELETE_DATA_NODE:
           walRedoer.redoDelete((DeleteDataNode) walEntry.getValue());
           break;

Reply via email to