This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch force_ci/support_schema_evolution
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a7dd10210a0413d66a5c2fdff81d8e263e5c162b
Author: Tian Jiang <[email protected]>
AuthorDate: Wed Dec 10 18:42:28 2025 +0800

    Add execution of EvolveSchemaNode
---
 .../dataregion/DataExecutionVisitor.java           |  12 ++
 .../impl/DataNodeInternalRPCServiceImpl.java       |  23 +++
 .../plan/planner/plan/node/PlanNodeType.java       |   8 +
 .../plan/planner/plan/node/PlanVisitor.java        |  10 ++
 .../node/pipe/PipeEnrichedEvolveSchemaNode.java    | 194 +++++++++++++++++++++
 .../planner/plan/node/write/EvolveSchemaNode.java  |  32 ++--
 .../db/storageengine/dataregion/DataRegion.java    |  88 ++++++----
 .../dataregion/DeviceLastFlushTime.java            |   3 +-
 .../dataregion/HashLastFlushTimeMap.java           |   8 +-
 .../storageengine/dataregion/ILastFlushTime.java   |   1 +
 .../dataregion/ILastFlushTimeMap.java              |   1 +
 .../dataregion/PartitionLastFlushTime.java         |   1 +
 .../dataregion/task/DataRegionTask.java            |  17 +-
 .../dataregion/task/DataRegionTaskManager.java     |  28 +--
 .../dataregion/task/SchemaEvolutionTask.java       |   8 +-
 .../dataregion/tsfile/evolution/ColumnRename.java  |  25 ++-
 .../dataregion/tsfile/evolution/EvolvedSchema.java |  53 +++---
 .../tsfile/evolution/SchemaEvolution.java          |  13 +-
 .../tsfile/evolution/SchemaEvolutionFile.java      |   1 +
 .../dataregion/tsfile/evolution/TableRename.java   |  16 +-
 .../dataregion/tsfile/fileset/TsFileSet.java       |   5 +-
 .../java/org/apache/iotdb/db/utils/io/IOUtils.java |   4 +-
 .../storageengine/dataregion/DataRegionTest.java   | 128 +++++++++-----
 .../tsfile/evolution/EvolvedSchemaTest.java        |  12 +-
 .../tsfile/evolution/SchemaEvolutionFileTest.java  |  11 +-
 .../org/apache/iotdb/commons/utils/FileUtils.java  |  81 ---------
 .../src/main/thrift/datanode.thrift                |   8 +
 27 files changed, 535 insertions(+), 256 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index cdce05b46c0..c3141b33c81 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.EvolveSchemaNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
@@ -286,6 +287,17 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
     }
   }
 
+  @Override
+  public TSStatus visitEvolveSchemaNode(EvolveSchemaNode node, DataRegion 
dataRegion) {
+    try {
+      dataRegion.applySchemaEvolution(node.getSchemaEvolutions());
+      return StatusUtils.OK;
+    } catch (final IOException e) {
+      LOGGER.error("Error in executing plan node: {}", node, e);
+      return new TSStatus(TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode());
+    }
+  }
+
   @Override
   public TSStatus visitPipeEnrichedDeleteDataNode(
       final PipeEnrichedDeleteDataNode node, final DataRegion context) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index dcc62fcd3c8..b1fb833a38e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -158,8 +158,10 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.vie
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.view.DeleteLogicalViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.view.RollbackLogicalViewBlackListNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedEvolveSchemaNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedNonWritePlanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.EvolveSchemaNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.TableDeviceSchemaFetcher;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache;
@@ -195,6 +197,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
 import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
 import 
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager;
 import 
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
@@ -233,6 +236,7 @@ import 
org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
+import org.apache.iotdb.mpp.rpc.thrift.TDataRegionEvolveSchemaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDeactivateTemplateReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDeleteColumnDataReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteSchemaReq;
@@ -787,6 +791,25 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
                 .getStatus());
   }
 
+  @Override
+  public TSStatus evolveSchemaInDataRegion(final TDataRegionEvolveSchemaReq 
req) {
+    final List<SchemaEvolution> schemaEvolutions =
+        SchemaEvolution.createListFrom(req.schemaEvolutions);
+    return executeInternalSchemaTask(
+        req.getDataRegionIdList(),
+        consensusGroupId ->
+            new RegionWriteExecutor()
+                .execute(
+                    new DataRegionId(consensusGroupId.getId()),
+                    // Now the deletion plan may be re-collected here by pipe, 
resulting multiple
+                    // transfer to delete time series plan. Now just ignore.
+                    req.isSetIsGeneratedByPipe() && req.isIsGeneratedByPipe()
+                        ? new PipeEnrichedEvolveSchemaNode(
+                            new EvolveSchemaNode(new PlanNodeId(""), 
schemaEvolutions))
+                        : new EvolveSchemaNode(new PlanNodeId(""), 
schemaEvolutions))
+                .getStatus());
+  }
+
   @Override
   public TSStatus deleteTimeSeries(final TDeleteTimeSeriesReq req) throws 
TException {
     final PathPatternTree patternTree =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 5a0bdcc3caf..95a121e61b4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -60,6 +60,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.vie
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.view.DeleteLogicalViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.view.RollbackLogicalViewBlackListNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedEvolveSchemaNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedNonWritePlanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWritePlanNode;
@@ -321,6 +322,7 @@ public enum PlanNodeType {
   RELATIONAL_INSERT_ROWS((short) 2002),
   RELATIONAL_DELETE_DATA((short) 2003),
   EVOLVE_SCHEMA((short) 2004),
+  PIPE_ENRICHED_EVOLVE_SCHEMA((short) 2005),
   ;
 
   public static final int BYTES = Short.BYTES;
@@ -366,6 +368,8 @@ public enum PlanNodeType {
         return RelationalDeleteDataNode.deserializeFromWAL(stream);
       case 2004:
         return EvolveSchemaNode.deserializeFromWAL(stream);
+      case 2005:
+        return PipeEnrichedEvolveSchemaNode.deserializeFromWAL(stream);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
@@ -394,6 +398,8 @@ public enum PlanNodeType {
         return RelationalDeleteDataNode.deserializeFromWAL(buffer);
       case 2004:
         return EvolveSchemaNode.deserializeFromWAL(buffer);
+      case 2005:
+        return PipeEnrichedEvolveSchemaNode.deserializeFromWAL(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
@@ -721,6 +727,8 @@ public enum PlanNodeType {
         return RelationalDeleteDataNode.deserialize(buffer);
       case 2004:
         return EvolveSchemaNode.deserialize(buffer);
+      case 2005:
+        return PipeEnrichedEvolveSchemaNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 9e8e1834247..d9218b34774 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -57,6 +57,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.vie
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.view.DeleteLogicalViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.view.RollbackLogicalViewBlackListNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedEvolveSchemaNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedNonWritePlanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWritePlanNode;
@@ -111,6 +112,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueries
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.EvolveSchemaNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
@@ -632,6 +634,10 @@ public abstract class PlanVisitor<R, C> {
     return visitPlan(node, context);
   }
 
+  public R visitEvolveSchemaNode(EvolveSchemaNode node, C context) {
+    return visitPlan(node, context);
+  }
+
   
/////////////////////////////////////////////////////////////////////////////////////////////////
   // Pipe Related Node
   
/////////////////////////////////////////////////////////////////////////////////////////////////
@@ -644,6 +650,10 @@ public abstract class PlanVisitor<R, C> {
     return visitPlan(node, context);
   }
 
+  public R visitPipeEnrichedEvolveSchemaNode(PipeEnrichedEvolveSchemaNode 
node, C context) {
+    return visitPlan(node, context);
+  }
+
   public R visitPipeEnrichedWritePlanNode(PipeEnrichedWritePlanNode node, C 
context) {
     return visitPlan(node, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedEvolveSchemaNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedEvolveSchemaNode.java
new file mode 100644
index 00000000000..794601eeb4e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedEvolveSchemaNode.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.EvolveSchemaNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class PipeEnrichedEvolveSchemaNode extends EvolveSchemaNode {
+
+  private final EvolveSchemaNode evolveSchemaNode;
+
+  public PipeEnrichedEvolveSchemaNode(final EvolveSchemaNode evolveSchemaNode) 
{
+    super(evolveSchemaNode.getPlanNodeId(), 
evolveSchemaNode.getSchemaEvolutions());
+    this.evolveSchemaNode = evolveSchemaNode;
+  }
+
+  public PlanNode EvolveSchemaNode() {
+    return evolveSchemaNode;
+  }
+
+  @Override
+  public boolean isGeneratedByPipe() {
+    return evolveSchemaNode.isGeneratedByPipe();
+  }
+
+  @Override
+  public void markAsGeneratedByPipe() {
+    evolveSchemaNode.markAsGeneratedByPipe();
+  }
+
+  @Override
+  public PlanNodeId getPlanNodeId() {
+    return evolveSchemaNode.getPlanNodeId();
+  }
+
+  @Override
+  public void setPlanNodeId(final PlanNodeId id) {
+    evolveSchemaNode.setPlanNodeId(id);
+  }
+
+  @Override
+  public ProgressIndex getProgressIndex() {
+    return evolveSchemaNode.getProgressIndex();
+  }
+
+  @Override
+  public void setProgressIndex(ProgressIndex progressIndex) {
+    evolveSchemaNode.setProgressIndex(progressIndex);
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return evolveSchemaNode.getChildren();
+  }
+
+  @Override
+  public void addChild(final PlanNode child) {
+    evolveSchemaNode.addChild(child);
+  }
+
+  @Override
+  public PlanNodeType getType() {
+    return PlanNodeType.PIPE_ENRICHED_EVOLVE_SCHEMA;
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new PipeEnrichedEvolveSchemaNode((EvolveSchemaNode) 
evolveSchemaNode.clone());
+  }
+
+  @Override
+  public PlanNode createSubNode(final int subNodeId, final int startIndex, 
final int endIndex) {
+    return new PipeEnrichedEvolveSchemaNode(
+        (EvolveSchemaNode) evolveSchemaNode.createSubNode(subNodeId, 
startIndex, endIndex));
+  }
+
+  @Override
+  public PlanNode cloneWithChildren(final List<PlanNode> children) {
+    return new PipeEnrichedEvolveSchemaNode(
+        (EvolveSchemaNode) evolveSchemaNode.cloneWithChildren(children));
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return evolveSchemaNode.allowedChildCount();
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return evolveSchemaNode.getOutputColumnNames();
+  }
+
+  @Override
+  public <R, C> R accept(final PlanVisitor<R, C> visitor, final C context) {
+    return visitor.visitPipeEnrichedEvolveSchemaNode(this, context);
+  }
+
+  @Override
+  protected void serializeAttributes(final ByteBuffer byteBuffer) {
+    PlanNodeType.PIPE_ENRICHED_EVOLVE_SCHEMA.serialize(byteBuffer);
+    evolveSchemaNode.serialize(byteBuffer);
+  }
+
+  @Override
+  protected void serializeAttributes(final DataOutputStream stream) throws 
IOException {
+    PlanNodeType.PIPE_ENRICHED_EVOLVE_SCHEMA.serialize(stream);
+    evolveSchemaNode.serialize(stream);
+  }
+
+  public static PipeEnrichedEvolveSchemaNode deserialize(final ByteBuffer 
buffer) {
+    return new PipeEnrichedEvolveSchemaNode((EvolveSchemaNode) 
PlanNodeType.deserialize(buffer));
+  }
+
+  @Override
+  public boolean equals(final Object o) {
+    return o instanceof PipeEnrichedEvolveSchemaNode
+        && evolveSchemaNode.equals(((PipeEnrichedEvolveSchemaNode) 
o).evolveSchemaNode);
+  }
+
+  @Override
+  public int hashCode() {
+    return evolveSchemaNode.hashCode();
+  }
+
+  @Override
+  public TRegionReplicaSet getRegionReplicaSet() {
+    return evolveSchemaNode.getRegionReplicaSet();
+  }
+
+  @Override
+  public List<WritePlanNode> splitByPartition(final IAnalysis analysis) {
+    return evolveSchemaNode.splitByPartition(analysis).stream()
+        .map(
+            plan ->
+                plan instanceof PipeEnrichedEvolveSchemaNode
+                    ? plan
+                    : new PipeEnrichedEvolveSchemaNode((EvolveSchemaNode) 
plan))
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public void serializeToWAL(final IWALByteBufferView buffer) {
+    evolveSchemaNode.serializeToWAL(buffer);
+  }
+
+  @Override
+  public int serializedSize() {
+    return evolveSchemaNode.serializedSize();
+  }
+
+  @Override
+  public SearchNode merge(List<SearchNode> searchNodes) {
+    List<SearchNode> unrichedNodes =
+        searchNodes.stream()
+            .map(
+                searchNode ->
+                    (SearchNode) ((PipeEnrichedEvolveSchemaNode) 
searchNode).EvolveSchemaNode())
+            .collect(Collectors.toList());
+    return new PipeEnrichedEvolveSchemaNode(
+        (EvolveSchemaNode) evolveSchemaNode.merge(unrichedNodes));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/EvolveSchemaNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/EvolveSchemaNode.java
index f85ccbfe17a..b4fc82e0710 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/EvolveSchemaNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/EvolveSchemaNode.java
@@ -19,40 +19,41 @@
 
 package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
 import org.apache.iotdb.db.utils.io.IOUtils;
+
 import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 public class EvolveSchemaNode extends SearchNode implements WALEntryValue {
 
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(EvolveSchemaNode.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(EvolveSchemaNode.class);
 
   protected TRegionReplicaSet regionReplicaSet;
   protected ProgressIndex progressIndex;
   private final List<SchemaEvolution> schemaEvolutions;
 
-  public EvolveSchemaNode(PlanNodeId id,
-      List<SchemaEvolution> schemaEvolutions) {
+  public EvolveSchemaNode(PlanNodeId id, List<SchemaEvolution> 
schemaEvolutions) {
     super(id);
     this.schemaEvolutions = schemaEvolutions;
   }
@@ -176,4 +177,13 @@ public class EvolveSchemaNode extends SearchNode 
implements WALEntryValue {
   public int serializedSize() {
     return 0;
   }
+
+  public List<SchemaEvolution> getSchemaEvolutions() {
+    return schemaEvolutions;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitEvolveSchemaNode(this, context);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index b2a340d14bd..c37b84d22b4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -112,6 +112,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
 import 
org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessorInfo;
 import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry.ModType;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry;
@@ -128,8 +129,10 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.ColumnRename;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.TableRename;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset.TsFileSet;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex;
@@ -148,6 +151,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALRecoverListener;
 import org.apache.iotdb.db.storageengine.load.disk.ILoadDiskSelector;
+import 
org.apache.iotdb.db.storageengine.load.disk.ILoadDiskSelector.DiskDirectorySelector;
 import org.apache.iotdb.db.storageengine.load.limiter.LoadTsFileRateLimiter;
 import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
 import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
@@ -167,10 +171,10 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import org.apache.thrift.TException;
+import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.external.commons.io.FileUtils;
 import org.apache.tsfile.file.metadata.ChunkMetadata;
 import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.IDeviceID.Factory;
 import org.apache.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.tsfile.fileSystem.FSType;
 import org.apache.tsfile.fileSystem.fsFactory.FSFactory;
@@ -452,7 +456,7 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   private void initDiskSelector() {
-    final ILoadDiskSelector.DiskDirectorySelector selector =
+    final DiskDirectorySelector selector =
         (sourceDirectory, fileName, tierLevel) -> {
           try {
             return TierManager.getInstance()
@@ -1021,15 +1025,12 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   private List<TsFileSet> recoverTsFileSets(
-      long partitionId,
-      Map<Long, List<TsFileSet>> tsFileSetMap
-  ) {
+      long partitionId, Map<Long, List<TsFileSet>> tsFileSetMap) {
     List<TsFileSet> tsFileSets =
         tsFileSetMap.computeIfAbsent(
             partitionId,
             pid -> {
-              File fileSetDir =
-                  new File(getFileSetsDir(partitionId));
+              File fileSetDir = new File(getFileSetsDir(partitionId));
               File[] fileSets = fileSetDir.listFiles();
               if (fileSets == null || fileSets.length == 0) {
                 return Collections.emptyList();
@@ -1040,9 +1041,7 @@ public class DataRegion implements IDataRegionForQuery {
                   try {
                     tsFileSet =
                         new TsFileSet(
-                            Long.parseLong(fileSet.getName()),
-                            fileSetDir.getAbsolutePath(),
-                            true);
+                            Long.parseLong(fileSet.getName()), 
fileSetDir.getAbsolutePath(), true);
                   } catch (NumberFormatException e) {
                     continue;
                   }
@@ -1058,7 +1057,6 @@ public class DataRegion implements IDataRegionForQuery {
     return tsFileSets;
   }
 
-
   private Callable<Void> recoverFilesInPartition(
       long partitionId,
       DataRegionRecoveryContext context,
@@ -1234,23 +1232,48 @@ public class DataRegion implements IDataRegionForQuery {
     for (Entry<Long, Long> partitionVersionEntry : 
partitionMaxFileVersions.entrySet()) {
       long partitionId = partitionVersionEntry.getKey();
       long maxVersion = partitionVersionEntry.getValue();
-      lastTsFileSetMap.compute(partitionId, (pid, lastSet) -> {
-        if (lastSet == null) {
-          lastSet = createNewFileSet(maxVersion, partitionId);
-        } else if (lastSet.getEndVersion() < maxVersion) {
-          lastSet = createNewFileSet(maxVersion, partitionId);
-        }
-        try {
-          lastSet.appendSchemaEvolution(schemaEvolutions);
-        } catch (IOException e) {
-          logger.error("Cannot append schema evolutions to fileSets in 
partition {}-{}", dataRegionId, partitionId, e);
-        }
-        return lastSet;
-      });
+      lastTsFileSetMap.compute(
+          partitionId,
+          (pid, lastSet) -> {
+            if (lastSet == null) {
+              lastSet = createNewFileSet(maxVersion, partitionId);
+            } else if (lastSet.getEndVersion() < maxVersion) {
+              lastSet = createNewFileSet(maxVersion, partitionId);
+            }
+            try {
+              lastSet.appendSchemaEvolution(schemaEvolutions);
+            } catch (IOException e) {
+              logger.error(
+                  "Cannot append schema evolutions to fileSets in partition 
{}-{}",
+                  dataRegionId,
+                  partitionId,
+                  e);
+            }
+            return lastSet;
+          });
     }
   }
 
   public void applySchemaEvolutionToObjects(List<SchemaEvolution> 
schemaEvolutions) {
+    for (SchemaEvolution schemaEvolution : schemaEvolutions) {
+      if (schemaEvolution instanceof TableRename) {
+        TableRename tableRename = (TableRename) schemaEvolution;
+        renameTableForObjects(tableRename.getNameBefore(), 
tableRename.getNameAfter());
+      } else if (schemaEvolution instanceof ColumnRename) {
+        ColumnRename columnRename = (ColumnRename) schemaEvolution;
+        if (columnRename.getDataType() == TSDataType.OBJECT) {
+          renameMeasurementForObjects(columnRename.getTableName(), 
columnRename.getNameBefore(), columnRename.getNameAfter());
+        }
+      }
+    }
+  }
+
+  private void renameTableForObjects(String nameBefore, String nameAfter) {
+    // TODO-SchemaEvolution
+    throw new UnsupportedOperationException();
+  }
+
+  private void renameMeasurementForObjects(String tableName, String 
nameBefore, String nameAfter) {
     // TODO-SchemaEvolution
     throw new UnsupportedOperationException();
   }
@@ -1686,7 +1709,7 @@ public class DataRegion implements IDataRegionForQuery {
     }
 
     List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
-    for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : 
tsFileProcessorMap.entrySet()) {
+    for (Entry<TsFileProcessor, InsertRowsNode> entry : 
tsFileProcessorMap.entrySet()) {
       TsFileProcessor tsFileProcessor = entry.getKey();
       InsertRowsNode subInsertRowsNode = entry.getValue();
       try {
@@ -2661,7 +2684,8 @@ public class DataRegion implements IDataRegionForQuery {
         deviceIdBackThen = evolvedSchema.rewriteDeviceId(singleDeviceId);
       }
 
-      if (!tsFileResource.isSatisfied(deviceIdBackThen, globalTimeFilter, 
isSeq, context.isDebug())) {
+      if (!tsFileResource.isSatisfied(
+          deviceIdBackThen, globalTimeFilter, isSeq, context.isDebug())) {
         continue;
       }
       try {
@@ -2885,12 +2909,12 @@ public class DataRegion implements IDataRegionForQuery {
     for (TableDeletionEntry modEntry : deleteDataNode.getModEntries()) {
       long startTime = modEntry.getStartTime();
       long endTime = modEntry.getEndTime();
-      for (Map.Entry<Long, TsFileProcessor> entry : 
workSequenceTsFileProcessors.entrySet()) {
+      for (Entry<Long, TsFileProcessor> entry : 
workSequenceTsFileProcessors.entrySet()) {
         if (TimePartitionUtils.satisfyPartitionId(startTime, endTime, 
entry.getKey())) {
           involvedProcessors.add(entry.getValue());
         }
       }
-      for (Map.Entry<Long, TsFileProcessor> entry : 
workUnsequenceTsFileProcessors.entrySet()) {
+      for (Entry<Long, TsFileProcessor> entry : 
workUnsequenceTsFileProcessors.entrySet()) {
         if (TimePartitionUtils.satisfyPartitionId(startTime, endTime, 
entry.getKey())) {
           involvedProcessors.add(entry.getValue());
         }
@@ -2926,13 +2950,13 @@ public class DataRegion implements IDataRegionForQuery {
     DeleteDataNode deleteDataNode =
         new DeleteDataNode(new PlanNodeId(""), 
Collections.singletonList(path), startTime, endTime);
     deleteDataNode.setSearchIndex(searchIndex);
-    for (Map.Entry<Long, TsFileProcessor> entry : 
workSequenceTsFileProcessors.entrySet()) {
+    for (Entry<Long, TsFileProcessor> entry : 
workSequenceTsFileProcessors.entrySet()) {
       if (TimePartitionUtils.satisfyPartitionId(startTime, endTime, 
entry.getKey())) {
         WALFlushListener walFlushListener = 
entry.getValue().logDeleteDataNodeInWAL(deleteDataNode);
         walFlushListeners.add(walFlushListener);
       }
     }
-    for (Map.Entry<Long, TsFileProcessor> entry : 
workUnsequenceTsFileProcessors.entrySet()) {
+    for (Entry<Long, TsFileProcessor> entry : 
workUnsequenceTsFileProcessors.entrySet()) {
       if (TimePartitionUtils.satisfyPartitionId(startTime, endTime, 
entry.getKey())) {
         WALFlushListener walFlushListener = 
entry.getValue().logDeleteDataNodeInWAL(deleteDataNode);
         walFlushListeners.add(walFlushListener);
@@ -3060,7 +3084,7 @@ public class DataRegion implements IDataRegionForQuery {
       ITimeIndex timeIndex = sealedTsFile.getTimeIndex();
 
       if ((timeIndex instanceof ArrayDeviceTimeIndex)
-          && (deletion.getType() == ModEntry.ModType.TABLE_DELETION)) {
+          && (deletion.getType() == ModType.TABLE_DELETION)) {
         ArrayDeviceTimeIndex deviceTimeIndex = (ArrayDeviceTimeIndex) 
timeIndex;
         Set<IDeviceID> devicesInFile = deviceTimeIndex.getDevices();
         boolean onlyOneTable = false;
@@ -4225,7 +4249,7 @@ public class DataRegion implements IDataRegionForQuery {
       // infoForMetrics[2]: ScheduleWalTimeCost
       // infoForMetrics[3]: ScheduleMemTableTimeCost
       // infoForMetrics[4]: InsertedPointsNumber
-      for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : 
tsFileProcessorMap.entrySet()) {
+      for (Entry<TsFileProcessor, InsertRowsNode> entry : 
tsFileProcessorMap.entrySet()) {
         TsFileProcessor tsFileProcessor = entry.getKey();
         InsertRowsNode subInsertRowsNode = entry.getValue();
         try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java
index e7c13f149a9..3e72acaa34d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java
@@ -19,10 +19,9 @@
 
 package org.apache.iotdb.db.storageengine.dataregion;
 
-import java.util.List;
-import java.util.stream.Collectors;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.TableRename;
+
 import org.apache.tsfile.file.metadata.IDeviceID;
 
 import java.util.HashMap;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
index cf8badd6d08..d02835015eb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java
@@ -19,20 +19,16 @@
 
 package org.apache.iotdb.db.storageengine.dataregion;
 
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.stream.Collectors;
 import org.apache.iotdb.db.storageengine.StorageEngine;
-
-import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.TableRename;
+
 import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.IDeviceID.Factory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class HashLastFlushTimeMap implements ILastFlushTimeMap {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTime.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTime.java
index f78b6822e90..9b685407326 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTime.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTime.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.storageengine.dataregion;
 
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
+
 import org.apache.tsfile.file.metadata.IDeviceID;
 
 public interface ILastFlushTime {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
index 40186cef573..ca3a5e37ced 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.storageengine.dataregion;
 
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
+
 import org.apache.tsfile.file.metadata.IDeviceID;
 
 import java.util.Map;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PartitionLastFlushTime.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PartitionLastFlushTime.java
index 100d0238618..e37ce43b930 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PartitionLastFlushTime.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PartitionLastFlushTime.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.storageengine.dataregion;
 
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
+
 import org.apache.tsfile.file.metadata.IDeviceID;
 
 public class PartitionLastFlushTime implements ILastFlushTime {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTask.java
index f727e2846e7..297518538dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTask.java
@@ -19,12 +19,14 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.task;
 
-import java.io.IOException;
-import java.io.InputStream;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.utils.io.StreamSerializable;
+
 import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
 
+import java.io.IOException;
+import java.io.InputStream;
+
 public interface DataRegionTask extends Runnable, StreamSerializable {
 
   long getTaskId();
@@ -38,7 +40,8 @@ public interface DataRegionTask extends Runnable, 
StreamSerializable {
   }
 
   @SuppressWarnings("SwitchStatementWithTooFewBranches")
-  static DataRegionTask createFrom(InputStream stream, long taskId, DataRegion 
dataRegion) throws IOException {
+  static DataRegionTask createFrom(InputStream stream, long taskId, DataRegion 
dataRegion)
+      throws IOException {
     int typeOrdinal = ReadWriteForEncodingUtils.readVarInt(stream);
     if (typeOrdinal < 0 || typeOrdinal >= TaskType.values().length) {
       throw new IOException("Invalid task type: " + typeOrdinal);
@@ -49,10 +52,10 @@ public interface DataRegionTask extends Runnable, 
StreamSerializable {
     DataRegionTask task;
     switch (taskType) {
       case SchemaEvolutionTask:
-       task = new SchemaEvolutionTask(dataRegion);
-       break;
-       default:
-         throw new IOException("Invalid task type: " + taskType);
+        task = new SchemaEvolutionTask(dataRegion);
+        break;
+      default:
+        throw new IOException("Invalid task type: " + taskType);
     }
     task.deserialize(stream);
     task.setTaskId(taskId);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTaskManager.java
index 5441b9b19c7..59339aed007 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTaskManager.java
@@ -19,6 +19,11 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.task;
 
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.File;
@@ -27,9 +32,6 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("ResultOfMethodCallIgnored")
 public class DataRegionTaskManager {
@@ -54,17 +56,19 @@ public class DataRegionTaskManager {
       return;
     }
 
-    Arrays.sort(files, (f1, f2) -> {
-      String fileName1 = f1.getName();
-      int suffixIndex1 = fileName1.indexOf(".");
-      long taskId1 = Long.parseLong(fileName1.substring(0, suffixIndex1));
+    Arrays.sort(
+        files,
+        (f1, f2) -> {
+          String fileName1 = f1.getName();
+          int suffixIndex1 = fileName1.indexOf(".");
+          long taskId1 = Long.parseLong(fileName1.substring(0, suffixIndex1));
 
-      String fileName2 = f2.getName();
-      int suffixIndex2 = fileName2.indexOf(".");
-      long taskId2 = Long.parseLong(fileName1.substring(0, suffixIndex2));
+          String fileName2 = f2.getName();
+          int suffixIndex2 = fileName2.indexOf(".");
+          long taskId2 = Long.parseLong(fileName1.substring(0, suffixIndex2));
 
-      return Long.compare(taskId1, taskId2);
-    });
+          return Long.compare(taskId1, taskId2);
+        });
 
     for (File file : files) {
       String fileName = file.getName();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/SchemaEvolutionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/SchemaEvolutionTask.java
index e3bae4c571c..9a361ca4700 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/SchemaEvolutionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/SchemaEvolutionTask.java
@@ -19,14 +19,16 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.task;
 
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
+
+import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
-import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
-import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
 
 public class SchemaEvolutionTask implements DataRegionTask {
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java
index 4986107a32d..80ff4b0f3ba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
 
-import java.nio.ByteBuffer;
-import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet.ColumnCategory;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -28,6 +26,7 @@ import org.apache.tsfile.utils.ReadWriteIOUtils;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 
 /** A schema evolution operation that renames a column in a table schema. */
 public class ColumnRename implements SchemaEvolution {
@@ -40,10 +39,11 @@ public class ColumnRename implements SchemaEvolution {
   // for deserialization
   public ColumnRename() {}
 
-  public ColumnRename(String tableName, String nameBefore, String nameAfter) {
+  public ColumnRename(String tableName, String nameBefore, String nameAfter, 
TSDataType dataType) {
     this.tableName = tableName.toLowerCase();
     this.nameBefore = nameBefore.toLowerCase();
     this.nameAfter = nameAfter.toLowerCase();
+    this.dataType = dataType;
   }
 
   @Override
@@ -72,7 +72,7 @@ public class ColumnRename implements SchemaEvolution {
     nameBefore = ReadWriteIOUtils.readVarIntString(stream);
     nameAfter = ReadWriteIOUtils.readVarIntString(stream);
     byte category = ReadWriteIOUtils.readByte(stream);
-    if (category != -1)  {
+    if (category != -1) {
       dataType = TSDataType.values()[category];
     }
   }
@@ -93,7 +93,7 @@ public class ColumnRename implements SchemaEvolution {
     nameBefore = ReadWriteIOUtils.readVarIntString(buffer);
     nameAfter = ReadWriteIOUtils.readVarIntString(buffer);
     byte category = ReadWriteIOUtils.readByte(buffer);
-    if (category != -1)  {
+    if (category != -1) {
       dataType = TSDataType.values()[category];
     }
   }
@@ -102,8 +102,19 @@ public class ColumnRename implements SchemaEvolution {
     return dataType;
   }
 
-  public void setDataType(
-      TSDataType dataType) {
+  public void setDataType(TSDataType dataType) {
     this.dataType = dataType;
   }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public String getNameBefore() {
+    return nameBefore;
+  }
+
+  public String getNameAfter() {
+    return nameAfter;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
index cd899ccc97c..8913d795eb3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java
@@ -19,13 +19,13 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
 
-import java.util.HashMap;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.IDeviceID.Factory;
+
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
-import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.IDeviceID.Factory;
 
 public class EvolvedSchema {
   // the evolved table names after applying all schema evolution operations
@@ -95,10 +95,12 @@ public class EvolvedSchema {
 
   @Override
   public String toString() {
-    return "EvolvedSchema{" +
-        "originalTableNames=" + originalTableNames +
-        ", originalColumnNames=" + originalColumnNames +
-        '}';
+    return "EvolvedSchema{"
+        + "originalTableNames="
+        + originalTableNames
+        + ", originalColumnNames="
+        + originalColumnNames
+        + '}';
   }
 
   public IDeviceID rewriteDeviceId(IDeviceID deviceID) {
@@ -128,32 +130,35 @@ public class EvolvedSchema {
   }
 
   public static EvolvedSchema merge(EvolvedSchema... schemas) {
-      EvolvedSchema firstNotNullSchema = null;
-      int i = 0;
-      for (; i < schemas.length; i++) {
-        if (schemas[i] != null) {
-          firstNotNullSchema = schemas[i];
-          i++;
-          break;
-        }
+    EvolvedSchema firstNotNullSchema = null;
+    int i = 0;
+    for (; i < schemas.length; i++) {
+      if (schemas[i] != null) {
+        firstNotNullSchema = schemas[i];
+        i++;
+        break;
       }
+    }
 
-      if (firstNotNullSchema == null) {
-        return null;
-      }
-      EvolvedSchema mergedSchema = deepCopy(firstNotNullSchema);
+    if (firstNotNullSchema == null) {
+      return null;
+    }
+    EvolvedSchema mergedSchema = deepCopy(firstNotNullSchema);
 
     for (; i < schemas.length; i++) {
       if (schemas[i] != null) {
         EvolvedSchema newSchema = schemas[i];
-        for (Entry<String, String> finalOriginalTableName : 
newSchema.originalTableNames.entrySet()) {
+        for (Entry<String, String> finalOriginalTableName :
+            newSchema.originalTableNames.entrySet()) {
           if (!finalOriginalTableName.getValue().isEmpty()) {
-            mergedSchema.renameTable(finalOriginalTableName.getValue(), 
finalOriginalTableName.getKey());
+            mergedSchema.renameTable(
+                finalOriginalTableName.getValue(), 
finalOriginalTableName.getKey());
           }
         }
-        for (Entry<String, Map<String, String>> 
finalTableNameColumnNameMapEntry : newSchema.originalColumnNames.entrySet()) {
-          for (Entry<String, String> finalColNameOriginalColNameEntry : 
finalTableNameColumnNameMapEntry.getValue()
-              .entrySet()) {
+        for (Entry<String, Map<String, String>> 
finalTableNameColumnNameMapEntry :
+            newSchema.originalColumnNames.entrySet()) {
+          for (Entry<String, String> finalColNameOriginalColNameEntry :
+              finalTableNameColumnNameMapEntry.getValue().entrySet()) {
             if (!finalColNameOriginalColNameEntry.getValue().isEmpty()) {
               String finalTableName = 
finalTableNameColumnNameMapEntry.getKey();
               String finalColName = finalColNameOriginalColNameEntry.getKey();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java
index b017f6509b2..f1ac8edbcfa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
 
-import java.nio.ByteBuffer;
 import org.apache.iotdb.db.utils.io.BufferSerializable;
 import org.apache.iotdb.db.utils.io.StreamSerializable;
 
@@ -27,6 +26,9 @@ import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 
 /** A schema evolution operation that can be applied to a TableSchemaMap. */
 public interface SchemaEvolution extends StreamSerializable, 
BufferSerializable {
@@ -77,4 +79,13 @@ public interface SchemaEvolution extends StreamSerializable, 
BufferSerializable
     evolution.deserialize(buffer);
     return evolution;
   }
+
+  static List<SchemaEvolution> createListFrom(ByteBuffer buffer) {
+    int size = ReadWriteForEncodingUtils.readVarInt(buffer);
+    List<SchemaEvolution> list = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      list.add(createFrom(buffer));
+    }
+    return list;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java
index e90c1ac32ca..1c4343cd154 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java
@@ -42,6 +42,7 @@ public class SchemaEvolutionFile {
 
   /**
    * Recover the SchemaEvolutionFile if it is broken.
+   *
    * @return true if the file exists false otherwise
    * @throws IOException if the file cannot be recovered
    */
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java
index dfc80974e85..a37557f45aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java
@@ -19,11 +19,6 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
 
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.iotdb.db.queryengine.execution.schedule.queue.ID;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.IDeviceID.Factory;
 import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
@@ -32,6 +27,10 @@ import org.apache.tsfile.utils.ReadWriteIOUtils;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 /** A schema evolution operation that renames a table in a schema map. */
 public class TableRename implements SchemaEvolution {
@@ -107,9 +106,10 @@ public class TableRename implements SchemaEvolution {
   }
 
   public <T> void rewriteMap(Map<IDeviceID, T> map) {
-    List<IDeviceID> affectedDeviceId = map.keySet().stream()
-        .filter(k -> k.getTableName().equals(getNameBefore())).collect(
-            Collectors.toList());
+    List<IDeviceID> affectedDeviceId =
+        map.keySet().stream()
+            .filter(k -> k.getTableName().equals(getNameBefore()))
+            .collect(Collectors.toList());
     for (IDeviceID deviceID : affectedDeviceId) {
       IDeviceID newDeviceId = rewriteDeviceId(deviceID);
       T removed = map.remove(deviceID);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
index 5c43418fa64..aaa7bb195ca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java
@@ -123,9 +123,6 @@ public class TsFileSet implements Comparable<TsFileSet> {
 
   @Override
   public String toString() {
-    return "TsFileSet{" +
-        "endVersion=" + endVersion +
-        ", fileSetDir=" + fileSetDir +
-        '}';
+    return "TsFileSet{" + "endVersion=" + endVersion + ", fileSetDir=" + 
fileSetDir + '}';
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/io/IOUtils.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/io/IOUtils.java
index f41ae6de8de..3a1841190cb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/io/IOUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/io/IOUtils.java
@@ -19,12 +19,12 @@
 
 package org.apache.iotdb.db.utils.io;
 
+import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.List;
-import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
 
 // TODO: move to TsFile
 public class IOUtils {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index e896a599c43..014874e5893 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.storageengine.dataregion;
 
-import java.nio.charset.StandardCharsets;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.DataRegionId;
@@ -94,6 +93,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -1789,41 +1789,59 @@ public class DataRegionTest {
       throws IllegalPathException, WriteProcessException, 
QueryProcessException, IOException {
     String[] measurements = {"tag1", "s1", "s2"};
     MeasurementSchema[] measurementSchemas = {
-        new MeasurementSchema("tag1", TSDataType.STRING),
-        new MeasurementSchema("s1", TSDataType.INT64),
-        new MeasurementSchema("s2", TSDataType.DOUBLE)
+      new MeasurementSchema("tag1", TSDataType.STRING),
+      new MeasurementSchema("s1", TSDataType.INT64),
+      new MeasurementSchema("s2", TSDataType.DOUBLE)
     };
-    RelationalInsertRowNode insertRowNode = new RelationalInsertRowNode(new 
PlanNodeId(""),
-        new PartialPath(new String[] {"table1"}),
-        true,
-        measurements,
-        new TSDataType[]{TSDataType.STRING, TSDataType.INT64, 
TSDataType.DOUBLE},
-        measurementSchemas,
-        10,
-        new Object[]{new Binary("tag1".getBytes(StandardCharsets.UTF_8)), 1L, 
1.0},
-        false,
-        new TsTableColumnCategory[]{TsTableColumnCategory.TAG, 
TsTableColumnCategory.FIELD, TsTableColumnCategory.FIELD});
+    RelationalInsertRowNode insertRowNode =
+        new RelationalInsertRowNode(
+            new PlanNodeId(""),
+            new PartialPath(new String[] {"table1"}),
+            true,
+            measurements,
+            new TSDataType[] {TSDataType.STRING, TSDataType.INT64, 
TSDataType.DOUBLE},
+            measurementSchemas,
+            10,
+            new Object[] {new Binary("tag1".getBytes(StandardCharsets.UTF_8)), 
1L, 1.0},
+            false,
+            new TsTableColumnCategory[] {
+              TsTableColumnCategory.TAG, TsTableColumnCategory.FIELD, 
TsTableColumnCategory.FIELD
+            });
     dataRegion.insert(insertRowNode);
 
     // table1 -> table2
     dataRegion.applySchemaEvolution(Collections.singletonList(new 
TableRename("table1", "table2")));
 
     // cannot query with the old name
-    IDeviceID deviceID1 = Factory.DEFAULT_FACTORY.create(new 
String[]{"table1", "tag1"});
-    List<IFullPath> fullPaths = Arrays.asList(
-        new AlignedFullPath(deviceID1, Arrays.asList(measurements), 
Arrays.asList(measurementSchemas))
-    );
-    QueryDataSource dataSource = dataRegion.query(fullPaths, deviceID1, new 
QueryContext(), null,
-        Collections.singletonList(0L), Long.MAX_VALUE);
+    IDeviceID deviceID1 = Factory.DEFAULT_FACTORY.create(new String[] 
{"table1", "tag1"});
+    List<IFullPath> fullPaths =
+        Arrays.asList(
+            new AlignedFullPath(
+                deviceID1, Arrays.asList(measurements), 
Arrays.asList(measurementSchemas)));
+    QueryDataSource dataSource =
+        dataRegion.query(
+            fullPaths,
+            deviceID1,
+            new QueryContext(),
+            null,
+            Collections.singletonList(0L),
+            Long.MAX_VALUE);
     assertTrue(dataSource.getSeqResources().isEmpty());
 
     // can query with the new name
-    IDeviceID deviceID2 = Factory.DEFAULT_FACTORY.create(new 
String[]{"table2", "tag1"});
-    fullPaths = Arrays.asList(
-        new AlignedFullPath(deviceID2, Arrays.asList(measurements), 
Arrays.asList(measurementSchemas))
-    );
-    dataSource = dataRegion.query(fullPaths, deviceID2, new QueryContext(), 
null,
-        Collections.singletonList(0L), Long.MAX_VALUE);
+    IDeviceID deviceID2 = Factory.DEFAULT_FACTORY.create(new String[] 
{"table2", "tag1"});
+    fullPaths =
+        Arrays.asList(
+            new AlignedFullPath(
+                deviceID2, Arrays.asList(measurements), 
Arrays.asList(measurementSchemas)));
+    dataSource =
+        dataRegion.query(
+            fullPaths,
+            deviceID2,
+            new QueryContext(),
+            null,
+            Collections.singletonList(0L),
+            Long.MAX_VALUE);
     assertEquals(1, dataSource.getSeqResources().size());
 
     DataNodeTableCache.getInstance()
@@ -1832,32 +1850,50 @@ public class DataRegionTest {
         .commitUpdateTable(dataRegion.getDatabaseName(), 
StatementTestUtils.tableName(1), null);
 
     // write again with table1
-    insertRowNode = new RelationalInsertRowNode(new PlanNodeId(""),
-        new PartialPath(new String[] {"table1"}),
-        true,
-        measurements,
-        new TSDataType[]{TSDataType.STRING, TSDataType.INT64, 
TSDataType.DOUBLE},
-        measurementSchemas,
-        10,
-        new Object[]{new Binary("tag1".getBytes(StandardCharsets.UTF_8)), 1L, 
1.0},
-        false,
-        new TsTableColumnCategory[]{TsTableColumnCategory.TAG, 
TsTableColumnCategory.FIELD, TsTableColumnCategory.FIELD});
+    insertRowNode =
+        new RelationalInsertRowNode(
+            new PlanNodeId(""),
+            new PartialPath(new String[] {"table1"}),
+            true,
+            measurements,
+            new TSDataType[] {TSDataType.STRING, TSDataType.INT64, 
TSDataType.DOUBLE},
+            measurementSchemas,
+            10,
+            new Object[] {new Binary("tag1".getBytes(StandardCharsets.UTF_8)), 
1L, 1.0},
+            false,
+            new TsTableColumnCategory[] {
+              TsTableColumnCategory.TAG, TsTableColumnCategory.FIELD, 
TsTableColumnCategory.FIELD
+            });
     dataRegion.insert(insertRowNode);
 
     // can query with table1
-    fullPaths = Arrays.asList(
-        new AlignedFullPath(deviceID1, Arrays.asList(measurements), 
Arrays.asList(measurementSchemas))
-    );
-    dataSource = dataRegion.query(fullPaths, deviceID1, new QueryContext(), 
null,
-        Collections.singletonList(0L), Long.MAX_VALUE);
+    fullPaths =
+        Arrays.asList(
+            new AlignedFullPath(
+                deviceID1, Arrays.asList(measurements), 
Arrays.asList(measurementSchemas)));
+    dataSource =
+        dataRegion.query(
+            fullPaths,
+            deviceID1,
+            new QueryContext(),
+            null,
+            Collections.singletonList(0L),
+            Long.MAX_VALUE);
     assertEquals(1, dataSource.getUnseqResources().size());
 
     // can query with table2
-    fullPaths = Arrays.asList(
-        new AlignedFullPath(deviceID2, Arrays.asList(measurements), 
Arrays.asList(measurementSchemas))
-    );
-    dataSource = dataRegion.query(fullPaths, deviceID2, new QueryContext(), 
null,
-        Collections.singletonList(0L), Long.MAX_VALUE);
+    fullPaths =
+        Arrays.asList(
+            new AlignedFullPath(
+                deviceID2, Arrays.asList(measurements), 
Arrays.asList(measurementSchemas)));
+    dataSource =
+        dataRegion.query(
+            fullPaths,
+            deviceID2,
+            new QueryContext(),
+            null,
+            Collections.singletonList(0L),
+            Long.MAX_VALUE);
     assertEquals(1, dataSource.getSeqResources().size());
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchemaTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchemaTest.java
index d9c76be4710..5f4f7e8280d 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchemaTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchemaTest.java
@@ -19,11 +19,13 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
 
-import static org.junit.Assert.assertEquals;
+import org.apache.tsfile.enums.TSDataType;
+import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.List;
-import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
 
 public class EvolvedSchemaTest {
 
@@ -33,7 +35,7 @@ public class EvolvedSchemaTest {
     List<SchemaEvolution> schemaEvolutionList =
         Arrays.asList(
             new TableRename("t1", "t2"),
-            new ColumnRename("t2", "s1", "s2"),
+            new ColumnRename("t2", "s1", "s2", TSDataType.INT32),
             new TableRename("t3", "t1"));
     EvolvedSchema oldSchema = new EvolvedSchema();
     EvolvedSchema allSchema = new EvolvedSchema();
@@ -44,7 +46,7 @@ public class EvolvedSchemaTest {
     schemaEvolutionList =
         Arrays.asList(
             new TableRename("t2", "t3"),
-            new ColumnRename("t3", "s2", "s1"),
+            new ColumnRename("t3", "s2", "s1", TSDataType.INT32),
             new TableRename("t1", "t2"));
     EvolvedSchema newSchema = new EvolvedSchema();
     schemaEvolutionList.forEach(schemaEvolution -> 
schemaEvolution.applyTo(newSchema));
@@ -54,4 +56,4 @@ public class EvolvedSchemaTest {
 
     assertEquals(allSchema, mergedShema);
   }
-}
\ No newline at end of file
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFileTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFileTest.java
index d4386dd8172..10348a92c17 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFileTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFileTest.java
@@ -19,13 +19,14 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
 
-import java.io.FileOutputStream;
 import org.apache.iotdb.db.utils.constant.TestConstant;
 
+import org.apache.tsfile.enums.TSDataType;
 import org.junit.After;
 import org.junit.Test;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
@@ -51,7 +52,7 @@ public class SchemaEvolutionFileTest {
     List<SchemaEvolution> schemaEvolutionList =
         Arrays.asList(
             new TableRename("t1", "t2"),
-            new ColumnRename("t2", "s1", "s2"),
+            new ColumnRename("t2", "s1", "s2", TSDataType.INT32),
             new TableRename("t3", "t1"));
     schemaEvolutionFile.append(schemaEvolutionList);
 
@@ -67,7 +68,7 @@ public class SchemaEvolutionFileTest {
     schemaEvolutionList =
         Arrays.asList(
             new TableRename("t2", "t3"),
-            new ColumnRename("t3", "s2", "s1"),
+            new ColumnRename("t3", "s2", "s1", TSDataType.INT32),
             new TableRename("t1", "t2"));
     schemaEvolutionFile.append(schemaEvolutionList);
     evolvedSchema = schemaEvolutionFile.readAsSchema();
@@ -97,7 +98,7 @@ public class SchemaEvolutionFileTest {
     List<SchemaEvolution> schemaEvolutionList =
         Arrays.asList(
             new TableRename("t1", "t2"),
-            new ColumnRename("t2", "s1", "s2"),
+            new ColumnRename("t2", "s1", "s2", TSDataType.INT32),
             new TableRename("t3", "t1"));
     schemaEvolutionFile.append(schemaEvolutionList);
 
@@ -111,7 +112,7 @@ public class SchemaEvolutionFileTest {
       fileOutputStream.write(new byte[100]);
     }
 
-    schemaEvolutionFile =  new SchemaEvolutionFile(files[0].getAbsolutePath());
+    schemaEvolutionFile = new SchemaEvolutionFile(files[0].getAbsolutePath());
     EvolvedSchema evolvedSchema = schemaEvolutionFile.readAsSchema();
     assertEquals("t1", evolvedSchema.getOriginalTableName("t2"));
     assertEquals("s1", evolvedSchema.getOriginalColumnName("t2", "s2"));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
index a2510454e73..baed8bcd537 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
@@ -19,12 +19,6 @@
 
 package org.apache.iotdb.commons.utils;
 
-import java.util.HashSet;
-import java.util.Objects;
-import java.util.Random;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 
 import org.apache.tsfile.external.commons.codec.digest.DigestUtils;
@@ -340,81 +334,6 @@ public class FileUtils {
     return true;
   }
 
-  public static <T> List<T> applyReversedIndexesOnListV2(
-      final List<Integer> filteredIndexes, final List<T> originalList) {
-    // filteredIndexes.sort(null); if necessary
-    List<T> filteredList = new ArrayList<>(originalList.size() - 
filteredIndexes.size());
-    int filteredIndexPos = 0;
-    int processingIndex = 0;
-    for (; processingIndex < originalList.size(); processingIndex++) {
-      if (filteredIndexPos >= filteredIndexes.size()) {
-        // all filteredIndexes processed, add remaining to the filteredList
-        filteredList.addAll(originalList.subList(processingIndex, 
originalList.size()));
-        break;
-      } else {
-        int filteredIndex = filteredIndexes.get(filteredIndexPos);
-        if (filteredIndex == processingIndex) {
-          // the index is filtered, move to the next filtered pos
-          filteredIndexPos ++;
-        } else {
-          // the index is not filtered, add to the filteredList
-          filteredList.add(originalList.get(processingIndex));
-        }
-      }
-    }
-    return filteredList;
-  }
-
-  public static <T> List<T> applyReversedIndexesOnListV1(
-      final List<Integer> filteredIndexes, final List<T> originalList) {
-    final Set<Integer> indexes = new HashSet<>(filteredIndexes);
-    return Objects.nonNull(originalList)
-        ? IntStream.range(0, originalList.size())
-        .filter(index -> !indexes.contains(index)) // 保留不在排除列表中的下标
-        .mapToObj(originalList::get)
-        .collect(Collectors.toList())
-        : null;
-  }
-
-  public static void main(String[] args) {
-    int elementNum = 10_000_000;
-    int filteredNum = elementNum / 10;
-    Random random = new Random();
-    List<Integer> originalList = IntStream.range(0, 
elementNum).boxed().collect(Collectors.toList());
-    List<Integer> filteredIndexes = new ArrayList<>(filteredNum);
-    for (int i = 0; i < filteredNum; i++) {
-      filteredIndexes.add(random.nextInt(elementNum));
-    }
-    filteredIndexes = 
filteredIndexes.stream().sorted().distinct().collect(Collectors.toList());
-
-    long start = System.currentTimeMillis();
-    List<Integer> appliedList = applyReversedIndexesOnListV1(filteredIndexes, 
originalList);
-    System.out.println(System.currentTimeMillis() - start);
-    Set<Integer> appliedSet = new HashSet<>(appliedList);
-    for (Integer filteredIndex : filteredIndexes) {
-      if (appliedSet.contains(filteredIndex)) {
-        System.out.println("Incorrect implementation");
-        System.exit(-1);
-      }
-    }
-
-
-    start = System.currentTimeMillis();
-    appliedList = WapplyReversedIndexesOnListV2(filteredIndexes, originalList);
-    System.out.println(System.currentTimeMillis() - start);
-    appliedSet = new HashSet<>(appliedList);
-    if (appliedList.size() != originalList.size() - filteredIndexes.size()) {
-      System.out.println("Incorrect implementation");
-      System.exit(-1);
-    }
-    for (Integer filteredIndex : filteredIndexes) {
-      if (appliedSet.contains(filteredIndex)) {
-        System.out.println("Incorrect implementation");
-        System.exit(-1);
-      }
-    }
-  }
-
   public static File createHardLink(File sourceFile, File hardlink) throws 
IOException {
     if (!hardlink.getParentFile().exists() && 
!hardlink.getParentFile().mkdirs()) {
       synchronized (FileUtils.class) {
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index caaf44c16a7..fd4f15c1e1a 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -459,6 +459,12 @@ struct TDeleteDataForDeleteSchemaReq {
   3: optional bool isGeneratedByPipe
 }
 
+struct TDataRegionEvolveSchemaReq {
+  1: required list<common.TConsensusGroupId> dataRegionIdList
+  2: required binary schemaEvolutions
+  3: optional bool isGeneratedByPipe
+}
+
 struct TDeleteTimeSeriesReq {
   1: required list<common.TConsensusGroupId> schemaRegionIdList
   2: required binary pathPatternTree
@@ -1076,6 +1082,8 @@ service IDataNodeRPCService {
    */
   common.TSStatus deleteDataForDeleteSchema(TDeleteDataForDeleteSchemaReq req)
 
+  common.TSStatus evolveSchemaInDataRegion(TDataRegionEvolveSchemaReq req)
+
   /**
    * Delete matched timeseries and remove according schema black list in 
target schemRegion
    */

Reply via email to