This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch force_ci/support_schema_evolution in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit acd76238a93ca801830ab8c8ad6fab2562ecef57 Author: Tian Jiang <[email protected]> AuthorDate: Wed Dec 10 17:31:13 2025 +0800 add EvolceSchemaNode --- .../plan/planner/plan/node/PlanNodeType.java | 8 + .../planner/plan/node/write/EvolveSchemaNode.java | 179 +++++++++++++++++++++ .../dataregion/tsfile/evolution/ColumnRename.java | 22 +++ .../tsfile/evolution/SchemaEvolution.java | 26 ++- .../dataregion/tsfile/evolution/TableRename.java | 15 ++ .../java/org/apache/iotdb/db/utils/io/IOUtils.java | 52 ++++++ 6 files changed, 296 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index 1100793ca88..5a0bdcc3caf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -107,6 +107,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueries import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.EvolveSchemaNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; @@ -319,6 +320,7 @@ public enum PlanNodeType { RELATIONAL_INSERT_ROW((short) 2001), RELATIONAL_INSERT_ROWS((short) 2002), RELATIONAL_DELETE_DATA((short) 2003), + EVOLVE_SCHEMA((short) 2004), ; public static final int BYTES = Short.BYTES; @@ -362,6 +364,8 @@ public enum PlanNodeType { return RelationalInsertRowsNode.deserializeFromWAL(stream); case 2003: return RelationalDeleteDataNode.deserializeFromWAL(stream); + case 2004: + return EvolveSchemaNode.deserializeFromWAL(stream); default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } @@ -388,6 +392,8 @@ public enum PlanNodeType { return RelationalInsertRowsNode.deserializeFromWAL(buffer); case 2003: return RelationalDeleteDataNode.deserializeFromWAL(buffer); + case 2004: + return EvolveSchemaNode.deserializeFromWAL(buffer); default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } @@ -713,6 +719,8 @@ public enum PlanNodeType { return RelationalInsertRowsNode.deserialize(buffer); case 2003: return RelationalDeleteDataNode.deserialize(buffer); + case 2004: + return EvolveSchemaNode.deserialize(buffer); default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/EvolveSchemaNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/EvolveSchemaNode.java new file mode 100644 index 00000000000..f85ccbfe17a --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/EvolveSchemaNode.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.consensus.index.ProgressIndex; +import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; +import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; +import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; +import org.apache.iotdb.db.utils.io.IOUtils; +import org.apache.tsfile.utils.ReadWriteForEncodingUtils; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EvolveSchemaNode extends SearchNode implements WALEntryValue { + + private static final Logger LOGGER = + LoggerFactory.getLogger(EvolveSchemaNode.class); + + protected TRegionReplicaSet regionReplicaSet; + protected ProgressIndex progressIndex; + private final List<SchemaEvolution> schemaEvolutions; + + public EvolveSchemaNode(PlanNodeId id, + List<SchemaEvolution> schemaEvolutions) { + super(id); + this.schemaEvolutions = schemaEvolutions; + } + + public static PlanNode deserializeFromWAL(DataInputStream stream) throws IOException { + long searchIndex = stream.readLong(); + int size = ReadWriteForEncodingUtils.readVarInt(stream); + List<SchemaEvolution> evolutions = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + evolutions.add(SchemaEvolution.createFrom(stream)); + } + + EvolveSchemaNode evolveSchemaNode = new EvolveSchemaNode(new PlanNodeId(""), evolutions); + evolveSchemaNode.setSearchIndex(searchIndex); + + return evolveSchemaNode; + } + + public static PlanNode deserializeFromWAL(ByteBuffer buffer) { + long searchIndex = buffer.getLong(); + int size = ReadWriteForEncodingUtils.readVarInt(buffer); + List<SchemaEvolution> evolutions = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + evolutions.add(SchemaEvolution.createFrom(buffer)); + } + + EvolveSchemaNode evolveSchemaNode = new EvolveSchemaNode(new PlanNodeId(""), evolutions); + evolveSchemaNode.setSearchIndex(searchIndex); + + return evolveSchemaNode; + } + + public static PlanNode deserialize(ByteBuffer buffer) { + int size = ReadWriteForEncodingUtils.readVarInt(buffer); + List<SchemaEvolution> evolutions = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + evolutions.add(SchemaEvolution.createFrom(buffer)); + } + + PlanNodeId planNodeId = PlanNodeId.deserialize(buffer); + + // EvolveSchemaNode has no child + int ignoredChildrenSize = ReadWriteIOUtils.readInt(buffer); + return new EvolveSchemaNode(planNodeId, evolutions); + } + + @Override + public SearchNode merge(List<SearchNode> searchNodes) { + return this; + } + + @Override + public ProgressIndex getProgressIndex() { + return progressIndex; + } + + @Override + public void setProgressIndex(ProgressIndex progressIndex) { + this.progressIndex = progressIndex; + } + + @Override + public List<WritePlanNode> splitByPartition(IAnalysis analysis) { + return Collections.singletonList(this); + } + + @Override + public TRegionReplicaSet getRegionReplicaSet() { + return regionReplicaSet; + } + + @Override + public List<PlanNode> getChildren() { + return Collections.emptyList(); + } + + @Override + public void addChild(PlanNode child) { + throw new UnsupportedOperationException(); + } + + @Override + public PlanNode clone() { + return new EvolveSchemaNode(id, schemaEvolutions); + } + + @Override + public int allowedChildCount() { + return 0; + } + + @Override + public List<String> getOutputColumnNames() { + return Collections.emptyList(); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.EVOLVE_SCHEMA.serialize(byteBuffer); + IOUtils.writeList(schemaEvolutions, byteBuffer); + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.EVOLVE_SCHEMA.serialize(stream); + IOUtils.writeList(schemaEvolutions, stream); + } + + @Override + public void serializeToWAL(IWALByteBufferView buffer) { + buffer.putShort(PlanNodeType.EVOLVE_SCHEMA.getNodeType()); + buffer.putLong(searchIndex); + try { + IOUtils.writeList(schemaEvolutions, buffer); + } catch (IOException e) { + LOGGER.warn("Error writing schema evolutions to WAL", e); + } + } + + @Override + public int serializedSize() { + return 0; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java index 23c18cea9f4..4986107a32d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution; +import java.nio.ByteBuffer; import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet.ColumnCategory; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.ReadWriteForEncodingUtils; @@ -76,6 +77,27 @@ public class ColumnRename implements SchemaEvolution { } } + @Override + public long serialize(ByteBuffer buffer) { + int size = ReadWriteForEncodingUtils.writeVarInt(getEvolutionType().ordinal(), buffer); + size += ReadWriteIOUtils.writeVar(tableName, buffer); + size += ReadWriteIOUtils.writeVar(nameBefore, buffer); + size += ReadWriteIOUtils.writeVar(nameAfter, buffer); + size += ReadWriteIOUtils.write(dataType != null ? (byte) dataType.ordinal() : -1, buffer); + return size; + } + + @Override + public void deserialize(ByteBuffer buffer) { + tableName = ReadWriteIOUtils.readVarIntString(buffer); + nameBefore = ReadWriteIOUtils.readVarIntString(buffer); + nameAfter = ReadWriteIOUtils.readVarIntString(buffer); + byte category = ReadWriteIOUtils.readByte(buffer); + if (category != -1) { + dataType = TSDataType.values()[category]; + } + } + public TSDataType getDataType() { return dataType; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java index 5a670879beb..b017f6509b2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution; +import java.nio.ByteBuffer; +import org.apache.iotdb.db.utils.io.BufferSerializable; import org.apache.iotdb.db.utils.io.StreamSerializable; import org.apache.tsfile.utils.ReadWriteForEncodingUtils; @@ -27,7 +29,7 @@ import java.io.IOException; import java.io.InputStream; /** A schema evolution operation that can be applied to a TableSchemaMap. */ -public interface SchemaEvolution extends StreamSerializable { +public interface SchemaEvolution extends StreamSerializable, BufferSerializable { /** * Apply this schema evolution operation to the given metadata. @@ -43,12 +45,11 @@ public interface SchemaEvolution extends StreamSerializable { COLUMN_RENAME } - static SchemaEvolution createFrom(InputStream stream) throws IOException { - int type = ReadWriteForEncodingUtils.readVarInt(stream); + static SchemaEvolution createFrom(int type) { if (type < 0 || type > SchemaEvolutionType.values().length) { - throw new IOException("Invalid evolution type: " + type); + throw new IllegalArgumentException("Invalid evolution type: " + type); } - SchemaEvolution evolution = null; + SchemaEvolution evolution; SchemaEvolutionType evolutionType = SchemaEvolutionType.values()[type]; switch (evolutionType) { case TABLE_RENAME: @@ -58,9 +59,22 @@ public interface SchemaEvolution extends StreamSerializable { evolution = new ColumnRename(); break; default: - throw new IOException("Invalid evolution type: " + evolutionType); + throw new IllegalArgumentException("Invalid evolution type: " + evolutionType); } + return evolution; + } + + static SchemaEvolution createFrom(InputStream stream) throws IOException { + int type = ReadWriteForEncodingUtils.readVarInt(stream); + SchemaEvolution evolution = createFrom(type); evolution.deserialize(stream); return evolution; } + + static SchemaEvolution createFrom(ByteBuffer buffer) { + int type = ReadWriteForEncodingUtils.readVarInt(buffer); + SchemaEvolution evolution = createFrom(type); + evolution.deserialize(buffer); + return evolution; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java index 02c9eaf0c26..dfc80974e85 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -70,6 +71,20 @@ public class TableRename implements SchemaEvolution { nameAfter = ReadWriteIOUtils.readVarIntString(stream); } + @Override + public long serialize(ByteBuffer buffer) { + long size = ReadWriteForEncodingUtils.writeVarInt(getEvolutionType().ordinal(), buffer); + size += ReadWriteIOUtils.writeVar(nameBefore, buffer); + size += ReadWriteIOUtils.writeVar(nameAfter, buffer); + return size; + } + + @Override + public void deserialize(ByteBuffer buffer) { + nameBefore = ReadWriteIOUtils.readVarIntString(buffer); + nameAfter = ReadWriteIOUtils.readVarIntString(buffer); + } + public String getNameBefore() { return nameBefore; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/io/IOUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/io/IOUtils.java new file mode 100644 index 00000000000..f41ae6de8de --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/io/IOUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import org.apache.tsfile.utils.ReadWriteForEncodingUtils; + +// TODO: move to TsFile +public class IOUtils { + + private IOUtils() { + // util class + } + + public static long writeList(List<? extends BufferSerializable> list, ByteBuffer byteBuffer) { + long size = ReadWriteForEncodingUtils.writeVarInt(list.size(), byteBuffer); + for (BufferSerializable item : list) { + size += item.serialize(byteBuffer); + } + return size; + } + + public static long writeList(List<? extends StreamSerializable> list, OutputStream stream) + throws IOException { + long size = ReadWriteForEncodingUtils.writeVarInt(list.size(), stream); + for (StreamSerializable item : list) { + size += item.serialize(stream); + } + return size; + } +}
