This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch force_ci/support_schema_evolution
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit acd76238a93ca801830ab8c8ad6fab2562ecef57
Author: Tian Jiang <[email protected]>
AuthorDate: Wed Dec 10 17:31:13 2025 +0800

    add EvolceSchemaNode
---
 .../plan/planner/plan/node/PlanNodeType.java       |   8 +
 .../planner/plan/node/write/EvolveSchemaNode.java  | 179 +++++++++++++++++++++
 .../dataregion/tsfile/evolution/ColumnRename.java  |  22 +++
 .../tsfile/evolution/SchemaEvolution.java          |  26 ++-
 .../dataregion/tsfile/evolution/TableRename.java   |  15 ++
 .../java/org/apache/iotdb/db/utils/io/IOUtils.java |  52 ++++++
 6 files changed, 296 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 1100793ca88..5a0bdcc3caf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -107,6 +107,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueries
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.EvolveSchemaNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
@@ -319,6 +320,7 @@ public enum PlanNodeType {
   RELATIONAL_INSERT_ROW((short) 2001),
   RELATIONAL_INSERT_ROWS((short) 2002),
   RELATIONAL_DELETE_DATA((short) 2003),
+  EVOLVE_SCHEMA((short) 2004),
   ;
 
   public static final int BYTES = Short.BYTES;
@@ -362,6 +364,8 @@ public enum PlanNodeType {
         return RelationalInsertRowsNode.deserializeFromWAL(stream);
       case 2003:
         return RelationalDeleteDataNode.deserializeFromWAL(stream);
+      case 2004:
+        return EvolveSchemaNode.deserializeFromWAL(stream);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
@@ -388,6 +392,8 @@ public enum PlanNodeType {
         return RelationalInsertRowsNode.deserializeFromWAL(buffer);
       case 2003:
         return RelationalDeleteDataNode.deserializeFromWAL(buffer);
+      case 2004:
+        return EvolveSchemaNode.deserializeFromWAL(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
@@ -713,6 +719,8 @@ public enum PlanNodeType {
         return RelationalInsertRowsNode.deserialize(buffer);
       case 2003:
         return RelationalDeleteDataNode.deserialize(buffer);
+      case 2004:
+        return EvolveSchemaNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/EvolveSchemaNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/EvolveSchemaNode.java
new file mode 100644
index 00000000000..f85ccbfe17a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/EvolveSchemaNode.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
+import org.apache.iotdb.db.utils.io.IOUtils;
+import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EvolveSchemaNode extends SearchNode implements WALEntryValue {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(EvolveSchemaNode.class);
+
+  protected TRegionReplicaSet regionReplicaSet;
+  protected ProgressIndex progressIndex;
+  private final List<SchemaEvolution> schemaEvolutions;
+
+  public EvolveSchemaNode(PlanNodeId id,
+      List<SchemaEvolution> schemaEvolutions) {
+    super(id);
+    this.schemaEvolutions = schemaEvolutions;
+  }
+
+  public static PlanNode deserializeFromWAL(DataInputStream stream) throws 
IOException {
+    long searchIndex = stream.readLong();
+    int size = ReadWriteForEncodingUtils.readVarInt(stream);
+    List<SchemaEvolution> evolutions = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      evolutions.add(SchemaEvolution.createFrom(stream));
+    }
+
+    EvolveSchemaNode evolveSchemaNode = new EvolveSchemaNode(new 
PlanNodeId(""), evolutions);
+    evolveSchemaNode.setSearchIndex(searchIndex);
+
+    return evolveSchemaNode;
+  }
+
+  public static PlanNode deserializeFromWAL(ByteBuffer buffer) {
+    long searchIndex = buffer.getLong();
+    int size = ReadWriteForEncodingUtils.readVarInt(buffer);
+    List<SchemaEvolution> evolutions = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      evolutions.add(SchemaEvolution.createFrom(buffer));
+    }
+
+    EvolveSchemaNode evolveSchemaNode = new EvolveSchemaNode(new 
PlanNodeId(""), evolutions);
+    evolveSchemaNode.setSearchIndex(searchIndex);
+
+    return evolveSchemaNode;
+  }
+
+  public static PlanNode deserialize(ByteBuffer buffer) {
+    int size = ReadWriteForEncodingUtils.readVarInt(buffer);
+    List<SchemaEvolution> evolutions = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      evolutions.add(SchemaEvolution.createFrom(buffer));
+    }
+
+    PlanNodeId planNodeId = PlanNodeId.deserialize(buffer);
+
+    // EvolveSchemaNode has no child
+    int ignoredChildrenSize = ReadWriteIOUtils.readInt(buffer);
+    return new EvolveSchemaNode(planNodeId, evolutions);
+  }
+
+  @Override
+  public SearchNode merge(List<SearchNode> searchNodes) {
+    return this;
+  }
+
+  @Override
+  public ProgressIndex getProgressIndex() {
+    return progressIndex;
+  }
+
+  @Override
+  public void setProgressIndex(ProgressIndex progressIndex) {
+    this.progressIndex = progressIndex;
+  }
+
+  @Override
+  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+    return Collections.singletonList(this);
+  }
+
+  @Override
+  public TRegionReplicaSet getRegionReplicaSet() {
+    return regionReplicaSet;
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void addChild(PlanNode child) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new EvolveSchemaNode(id, schemaEvolutions);
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return 0;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.EVOLVE_SCHEMA.serialize(byteBuffer);
+    IOUtils.writeList(schemaEvolutions, byteBuffer);
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.EVOLVE_SCHEMA.serialize(stream);
+    IOUtils.writeList(schemaEvolutions, stream);
+  }
+
+  @Override
+  public void serializeToWAL(IWALByteBufferView buffer) {
+    buffer.putShort(PlanNodeType.EVOLVE_SCHEMA.getNodeType());
+    buffer.putLong(searchIndex);
+    try {
+      IOUtils.writeList(schemaEvolutions, buffer);
+    } catch (IOException e) {
+      LOGGER.warn("Error writing schema evolutions to WAL", e);
+    }
+  }
+
+  @Override
+  public int serializedSize() {
+    return 0;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java
index 23c18cea9f4..4986107a32d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
 
+import java.nio.ByteBuffer;
 import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet.ColumnCategory;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
@@ -76,6 +77,27 @@ public class ColumnRename implements SchemaEvolution {
     }
   }
 
+  @Override
+  public long serialize(ByteBuffer buffer) {
+    int size = 
ReadWriteForEncodingUtils.writeVarInt(getEvolutionType().ordinal(), buffer);
+    size += ReadWriteIOUtils.writeVar(tableName, buffer);
+    size += ReadWriteIOUtils.writeVar(nameBefore, buffer);
+    size += ReadWriteIOUtils.writeVar(nameAfter, buffer);
+    size += ReadWriteIOUtils.write(dataType != null ? (byte) 
dataType.ordinal() : -1, buffer);
+    return size;
+  }
+
+  @Override
+  public void deserialize(ByteBuffer buffer) {
+    tableName = ReadWriteIOUtils.readVarIntString(buffer);
+    nameBefore = ReadWriteIOUtils.readVarIntString(buffer);
+    nameAfter = ReadWriteIOUtils.readVarIntString(buffer);
+    byte category = ReadWriteIOUtils.readByte(buffer);
+    if (category != -1)  {
+      dataType = TSDataType.values()[category];
+    }
+  }
+
   public TSDataType getDataType() {
     return dataType;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java
index 5a670879beb..b017f6509b2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
 
+import java.nio.ByteBuffer;
+import org.apache.iotdb.db.utils.io.BufferSerializable;
 import org.apache.iotdb.db.utils.io.StreamSerializable;
 
 import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
@@ -27,7 +29,7 @@ import java.io.IOException;
 import java.io.InputStream;
 
 /** A schema evolution operation that can be applied to a TableSchemaMap. */
-public interface SchemaEvolution extends StreamSerializable {
+public interface SchemaEvolution extends StreamSerializable, 
BufferSerializable {
 
   /**
    * Apply this schema evolution operation to the given metadata.
@@ -43,12 +45,11 @@ public interface SchemaEvolution extends StreamSerializable 
{
     COLUMN_RENAME
   }
 
-  static SchemaEvolution createFrom(InputStream stream) throws IOException {
-    int type = ReadWriteForEncodingUtils.readVarInt(stream);
+  static SchemaEvolution createFrom(int type) {
     if (type < 0 || type > SchemaEvolutionType.values().length) {
-      throw new IOException("Invalid evolution type: " + type);
+      throw new IllegalArgumentException("Invalid evolution type: " + type);
     }
-    SchemaEvolution evolution = null;
+    SchemaEvolution evolution;
     SchemaEvolutionType evolutionType = SchemaEvolutionType.values()[type];
     switch (evolutionType) {
       case TABLE_RENAME:
@@ -58,9 +59,22 @@ public interface SchemaEvolution extends StreamSerializable {
         evolution = new ColumnRename();
         break;
       default:
-        throw new IOException("Invalid evolution type: " + evolutionType);
+        throw new IllegalArgumentException("Invalid evolution type: " + 
evolutionType);
     }
+    return evolution;
+  }
+
+  static SchemaEvolution createFrom(InputStream stream) throws IOException {
+    int type = ReadWriteForEncodingUtils.readVarInt(stream);
+    SchemaEvolution evolution = createFrom(type);
     evolution.deserialize(stream);
     return evolution;
   }
+
+  static SchemaEvolution createFrom(ByteBuffer buffer) {
+    int type = ReadWriteForEncodingUtils.readVarInt(buffer);
+    SchemaEvolution evolution = createFrom(type);
+    evolution.deserialize(buffer);
+    return evolution;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java
index 02c9eaf0c26..dfc80974e85 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -70,6 +71,20 @@ public class TableRename implements SchemaEvolution {
     nameAfter = ReadWriteIOUtils.readVarIntString(stream);
   }
 
+  @Override
+  public long serialize(ByteBuffer buffer) {
+    long size = 
ReadWriteForEncodingUtils.writeVarInt(getEvolutionType().ordinal(), buffer);
+    size += ReadWriteIOUtils.writeVar(nameBefore, buffer);
+    size += ReadWriteIOUtils.writeVar(nameAfter, buffer);
+    return size;
+  }
+
+  @Override
+  public void deserialize(ByteBuffer buffer) {
+    nameBefore = ReadWriteIOUtils.readVarIntString(buffer);
+    nameAfter = ReadWriteIOUtils.readVarIntString(buffer);
+  }
+
   public String getNameBefore() {
     return nameBefore;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/io/IOUtils.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/io/IOUtils.java
new file mode 100644
index 00000000000..f41ae6de8de
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/io/IOUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
+
+// TODO: move to TsFile
+public class IOUtils {
+
+  private IOUtils() {
+    // util class
+  }
+
+  public static long writeList(List<? extends BufferSerializable> list, 
ByteBuffer byteBuffer) {
+    long size = ReadWriteForEncodingUtils.writeVarInt(list.size(), byteBuffer);
+    for (BufferSerializable item : list) {
+      size += item.serialize(byteBuffer);
+    }
+    return size;
+  }
+
+  public static long writeList(List<? extends StreamSerializable> list, 
OutputStream stream)
+      throws IOException {
+    long size = ReadWriteForEncodingUtils.writeVarInt(list.size(), stream);
+    for (StreamSerializable item : list) {
+      size += item.serialize(stream);
+    }
+    return size;
+  }
+}

Reply via email to