This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch force_ci/support_schema_evolution in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a7dd10210a0413d66a5c2fdff81d8e263e5c162b Author: Tian Jiang <[email protected]> AuthorDate: Wed Dec 10 18:42:28 2025 +0800 Add execution of EvolveSchemaNode --- .../dataregion/DataExecutionVisitor.java | 12 ++ .../impl/DataNodeInternalRPCServiceImpl.java | 23 +++ .../plan/planner/plan/node/PlanNodeType.java | 8 + .../plan/planner/plan/node/PlanVisitor.java | 10 ++ .../node/pipe/PipeEnrichedEvolveSchemaNode.java | 194 +++++++++++++++++++++ .../planner/plan/node/write/EvolveSchemaNode.java | 32 ++-- .../db/storageengine/dataregion/DataRegion.java | 88 ++++++---- .../dataregion/DeviceLastFlushTime.java | 3 +- .../dataregion/HashLastFlushTimeMap.java | 8 +- .../storageengine/dataregion/ILastFlushTime.java | 1 + .../dataregion/ILastFlushTimeMap.java | 1 + .../dataregion/PartitionLastFlushTime.java | 1 + .../dataregion/task/DataRegionTask.java | 17 +- .../dataregion/task/DataRegionTaskManager.java | 28 +-- .../dataregion/task/SchemaEvolutionTask.java | 8 +- .../dataregion/tsfile/evolution/ColumnRename.java | 25 ++- .../dataregion/tsfile/evolution/EvolvedSchema.java | 53 +++--- .../tsfile/evolution/SchemaEvolution.java | 13 +- .../tsfile/evolution/SchemaEvolutionFile.java | 1 + .../dataregion/tsfile/evolution/TableRename.java | 16 +- .../dataregion/tsfile/fileset/TsFileSet.java | 5 +- .../java/org/apache/iotdb/db/utils/io/IOUtils.java | 4 +- .../storageengine/dataregion/DataRegionTest.java | 128 +++++++++----- .../tsfile/evolution/EvolvedSchemaTest.java | 12 +- .../tsfile/evolution/SchemaEvolutionFileTest.java | 11 +- .../org/apache/iotdb/commons/utils/FileUtils.java | 81 --------- .../src/main/thrift/datanode.thrift | 8 + 27 files changed, 535 insertions(+), 256 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java index cdce05b46c0..c3141b33c81 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.EvolveSchemaNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; @@ -286,6 +287,17 @@ public class DataExecutionVisitor extends PlanVisitor<TSStatus, DataRegion> { } } + @Override + public TSStatus visitEvolveSchemaNode(EvolveSchemaNode node, DataRegion dataRegion) { + try { + dataRegion.applySchemaEvolution(node.getSchemaEvolutions()); + return StatusUtils.OK; + } catch (final IOException e) { + LOGGER.error("Error in executing plan node: {}", node, e); + return new TSStatus(TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode()); + } + } + @Override public TSStatus visitPipeEnrichedDeleteDataNode( final PipeEnrichedDeleteDataNode node, final DataRegion context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index dcc62fcd3c8..b1fb833a38e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -158,8 +158,10 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.vie import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.view.DeleteLogicalViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.view.RollbackLogicalViewBlackListNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedEvolveSchemaNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedNonWritePlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.EvolveSchemaNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.TableDeviceSchemaFetcher; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache; @@ -195,6 +197,7 @@ import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager; import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; @@ -233,6 +236,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq; import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq; import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp; +import org.apache.iotdb.mpp.rpc.thrift.TDataRegionEvolveSchemaReq; import org.apache.iotdb.mpp.rpc.thrift.TDeactivateTemplateReq; import org.apache.iotdb.mpp.rpc.thrift.TDeleteColumnDataReq; import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteSchemaReq; @@ -787,6 +791,25 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface .getStatus()); } + @Override + public TSStatus evolveSchemaInDataRegion(final TDataRegionEvolveSchemaReq req) { + final List<SchemaEvolution> schemaEvolutions = + SchemaEvolution.createListFrom(req.schemaEvolutions); + return executeInternalSchemaTask( + req.getDataRegionIdList(), + consensusGroupId -> + new RegionWriteExecutor() + .execute( + new DataRegionId(consensusGroupId.getId()), + // Now the deletion plan may be re-collected here by pipe, resulting multiple + // transfer to delete time series plan. Now just ignore. + req.isSetIsGeneratedByPipe() && req.isIsGeneratedByPipe() + ? new PipeEnrichedEvolveSchemaNode( + new EvolveSchemaNode(new PlanNodeId(""), schemaEvolutions)) + : new EvolveSchemaNode(new PlanNodeId(""), schemaEvolutions)) + .getStatus()); + } + @Override public TSStatus deleteTimeSeries(final TDeleteTimeSeriesReq req) throws TException { final PathPatternTree patternTree = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index 5a0bdcc3caf..95a121e61b4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -60,6 +60,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.vie import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.view.DeleteLogicalViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.view.RollbackLogicalViewBlackListNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedEvolveSchemaNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedNonWritePlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWritePlanNode; @@ -321,6 +322,7 @@ public enum PlanNodeType { RELATIONAL_INSERT_ROWS((short) 2002), RELATIONAL_DELETE_DATA((short) 2003), EVOLVE_SCHEMA((short) 2004), + PIPE_ENRICHED_EVOLVE_SCHEMA((short) 2005), ; public static final int BYTES = Short.BYTES; @@ -366,6 +368,8 @@ public enum PlanNodeType { return RelationalDeleteDataNode.deserializeFromWAL(stream); case 2004: return EvolveSchemaNode.deserializeFromWAL(stream); + case 2005: + return PipeEnrichedEvolveSchemaNode.deserializeFromWAL(stream); default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } @@ -394,6 +398,8 @@ public enum PlanNodeType { return RelationalDeleteDataNode.deserializeFromWAL(buffer); case 2004: return EvolveSchemaNode.deserializeFromWAL(buffer); + case 2005: + return PipeEnrichedEvolveSchemaNode.deserializeFromWAL(buffer); default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } @@ -721,6 +727,8 @@ public enum PlanNodeType { return RelationalDeleteDataNode.deserialize(buffer); case 2004: return EvolveSchemaNode.deserialize(buffer); + case 2005: + return PipeEnrichedEvolveSchemaNode.deserialize(buffer); default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 9e8e1834247..d9218b34774 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -57,6 +57,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.vie import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.view.DeleteLogicalViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.view.RollbackLogicalViewBlackListNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedEvolveSchemaNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedNonWritePlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWritePlanNode; @@ -111,6 +112,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueries import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.TimeseriesRegionScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.EvolveSchemaNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; @@ -632,6 +634,10 @@ public abstract class PlanVisitor<R, C> { return visitPlan(node, context); } + public R visitEvolveSchemaNode(EvolveSchemaNode node, C context) { + return visitPlan(node, context); + } + ///////////////////////////////////////////////////////////////////////////////////////////////// // Pipe Related Node ///////////////////////////////////////////////////////////////////////////////////////////////// @@ -644,6 +650,10 @@ public abstract class PlanVisitor<R, C> { return visitPlan(node, context); } + public R visitPipeEnrichedEvolveSchemaNode(PipeEnrichedEvolveSchemaNode node, C context) { + return visitPlan(node, context); + } + public R visitPipeEnrichedWritePlanNode(PipeEnrichedWritePlanNode node, C context) { return visitPlan(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedEvolveSchemaNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedEvolveSchemaNode.java new file mode 100644 index 00000000000..794601eeb4e --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedEvolveSchemaNode.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe; + +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.consensus.index.ProgressIndex; +import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.EvolveSchemaNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode; +import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.stream.Collectors; + +public class PipeEnrichedEvolveSchemaNode extends EvolveSchemaNode { + + private final EvolveSchemaNode evolveSchemaNode; + + public PipeEnrichedEvolveSchemaNode(final EvolveSchemaNode evolveSchemaNode) { + super(evolveSchemaNode.getPlanNodeId(), evolveSchemaNode.getSchemaEvolutions()); + this.evolveSchemaNode = evolveSchemaNode; + } + + public PlanNode EvolveSchemaNode() { + return evolveSchemaNode; + } + + @Override + public boolean isGeneratedByPipe() { + return evolveSchemaNode.isGeneratedByPipe(); + } + + @Override + public void markAsGeneratedByPipe() { + evolveSchemaNode.markAsGeneratedByPipe(); + } + + @Override + public PlanNodeId getPlanNodeId() { + return evolveSchemaNode.getPlanNodeId(); + } + + @Override + public void setPlanNodeId(final PlanNodeId id) { + evolveSchemaNode.setPlanNodeId(id); + } + + @Override + public ProgressIndex getProgressIndex() { + return evolveSchemaNode.getProgressIndex(); + } + + @Override + public void setProgressIndex(ProgressIndex progressIndex) { + evolveSchemaNode.setProgressIndex(progressIndex); + } + + @Override + public List<PlanNode> getChildren() { + return evolveSchemaNode.getChildren(); + } + + @Override + public void addChild(final PlanNode child) { + evolveSchemaNode.addChild(child); + } + + @Override + public PlanNodeType getType() { + return PlanNodeType.PIPE_ENRICHED_EVOLVE_SCHEMA; + } + + @Override + public PlanNode clone() { + return new PipeEnrichedEvolveSchemaNode((EvolveSchemaNode) evolveSchemaNode.clone()); + } + + @Override + public PlanNode createSubNode(final int subNodeId, final int startIndex, final int endIndex) { + return new PipeEnrichedEvolveSchemaNode( + (EvolveSchemaNode) evolveSchemaNode.createSubNode(subNodeId, startIndex, endIndex)); + } + + @Override + public PlanNode cloneWithChildren(final List<PlanNode> children) { + return new PipeEnrichedEvolveSchemaNode( + (EvolveSchemaNode) evolveSchemaNode.cloneWithChildren(children)); + } + + @Override + public int allowedChildCount() { + return evolveSchemaNode.allowedChildCount(); + } + + @Override + public List<String> getOutputColumnNames() { + return evolveSchemaNode.getOutputColumnNames(); + } + + @Override + public <R, C> R accept(final PlanVisitor<R, C> visitor, final C context) { + return visitor.visitPipeEnrichedEvolveSchemaNode(this, context); + } + + @Override + protected void serializeAttributes(final ByteBuffer byteBuffer) { + PlanNodeType.PIPE_ENRICHED_EVOLVE_SCHEMA.serialize(byteBuffer); + evolveSchemaNode.serialize(byteBuffer); + } + + @Override + protected void serializeAttributes(final DataOutputStream stream) throws IOException { + PlanNodeType.PIPE_ENRICHED_EVOLVE_SCHEMA.serialize(stream); + evolveSchemaNode.serialize(stream); + } + + public static PipeEnrichedEvolveSchemaNode deserialize(final ByteBuffer buffer) { + return new PipeEnrichedEvolveSchemaNode((EvolveSchemaNode) PlanNodeType.deserialize(buffer)); + } + + @Override + public boolean equals(final Object o) { + return o instanceof PipeEnrichedEvolveSchemaNode + && evolveSchemaNode.equals(((PipeEnrichedEvolveSchemaNode) o).evolveSchemaNode); + } + + @Override + public int hashCode() { + return evolveSchemaNode.hashCode(); + } + + @Override + public TRegionReplicaSet getRegionReplicaSet() { + return evolveSchemaNode.getRegionReplicaSet(); + } + + @Override + public List<WritePlanNode> splitByPartition(final IAnalysis analysis) { + return evolveSchemaNode.splitByPartition(analysis).stream() + .map( + plan -> + plan instanceof PipeEnrichedEvolveSchemaNode + ? plan + : new PipeEnrichedEvolveSchemaNode((EvolveSchemaNode) plan)) + .collect(Collectors.toList()); + } + + @Override + public void serializeToWAL(final IWALByteBufferView buffer) { + evolveSchemaNode.serializeToWAL(buffer); + } + + @Override + public int serializedSize() { + return evolveSchemaNode.serializedSize(); + } + + @Override + public SearchNode merge(List<SearchNode> searchNodes) { + List<SearchNode> unrichedNodes = + searchNodes.stream() + .map( + searchNode -> + (SearchNode) ((PipeEnrichedEvolveSchemaNode) searchNode).EvolveSchemaNode()) + .collect(Collectors.toList()); + return new PipeEnrichedEvolveSchemaNode( + (EvolveSchemaNode) evolveSchemaNode.merge(unrichedNodes)); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/EvolveSchemaNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/EvolveSchemaNode.java index f85ccbfe17a..b4fc82e0710 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/EvolveSchemaNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/EvolveSchemaNode.java @@ -19,40 +19,41 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; import org.apache.iotdb.db.utils.io.IOUtils; + import org.apache.tsfile.utils.ReadWriteForEncodingUtils; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + public class EvolveSchemaNode extends SearchNode implements WALEntryValue { - private static final Logger LOGGER = - LoggerFactory.getLogger(EvolveSchemaNode.class); + private static final Logger LOGGER = LoggerFactory.getLogger(EvolveSchemaNode.class); protected TRegionReplicaSet regionReplicaSet; protected ProgressIndex progressIndex; private final List<SchemaEvolution> schemaEvolutions; - public EvolveSchemaNode(PlanNodeId id, - List<SchemaEvolution> schemaEvolutions) { + public EvolveSchemaNode(PlanNodeId id, List<SchemaEvolution> schemaEvolutions) { super(id); this.schemaEvolutions = schemaEvolutions; } @@ -176,4 +177,13 @@ public class EvolveSchemaNode extends SearchNode implements WALEntryValue { public int serializedSize() { return 0; } + + public List<SchemaEvolution> getSchemaEvolutions() { + return schemaEvolutions; + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitEvolveSchemaNode(this, context); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index b2a340d14bd..c37b84d22b4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -112,6 +112,7 @@ import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessorInfo; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry.ModType; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; @@ -128,8 +129,10 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.ColumnRename; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.TableRename; import org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset.TsFileSet; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex; @@ -148,6 +151,7 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALRecoverListener; import org.apache.iotdb.db.storageengine.load.disk.ILoadDiskSelector; +import org.apache.iotdb.db.storageengine.load.disk.ILoadDiskSelector.DiskDirectorySelector; import org.apache.iotdb.db.storageengine.load.limiter.LoadTsFileRateLimiter; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; @@ -167,10 +171,10 @@ import org.apache.iotdb.rpc.TSStatusCode; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import org.apache.thrift.TException; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.external.commons.io.FileUtils; import org.apache.tsfile.file.metadata.ChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; -import org.apache.tsfile.file.metadata.IDeviceID.Factory; import org.apache.tsfile.fileSystem.FSFactoryProducer; import org.apache.tsfile.fileSystem.FSType; import org.apache.tsfile.fileSystem.fsFactory.FSFactory; @@ -452,7 +456,7 @@ public class DataRegion implements IDataRegionForQuery { } private void initDiskSelector() { - final ILoadDiskSelector.DiskDirectorySelector selector = + final DiskDirectorySelector selector = (sourceDirectory, fileName, tierLevel) -> { try { return TierManager.getInstance() @@ -1021,15 +1025,12 @@ public class DataRegion implements IDataRegionForQuery { } private List<TsFileSet> recoverTsFileSets( - long partitionId, - Map<Long, List<TsFileSet>> tsFileSetMap - ) { + long partitionId, Map<Long, List<TsFileSet>> tsFileSetMap) { List<TsFileSet> tsFileSets = tsFileSetMap.computeIfAbsent( partitionId, pid -> { - File fileSetDir = - new File(getFileSetsDir(partitionId)); + File fileSetDir = new File(getFileSetsDir(partitionId)); File[] fileSets = fileSetDir.listFiles(); if (fileSets == null || fileSets.length == 0) { return Collections.emptyList(); @@ -1040,9 +1041,7 @@ public class DataRegion implements IDataRegionForQuery { try { tsFileSet = new TsFileSet( - Long.parseLong(fileSet.getName()), - fileSetDir.getAbsolutePath(), - true); + Long.parseLong(fileSet.getName()), fileSetDir.getAbsolutePath(), true); } catch (NumberFormatException e) { continue; } @@ -1058,7 +1057,6 @@ public class DataRegion implements IDataRegionForQuery { return tsFileSets; } - private Callable<Void> recoverFilesInPartition( long partitionId, DataRegionRecoveryContext context, @@ -1234,23 +1232,48 @@ public class DataRegion implements IDataRegionForQuery { for (Entry<Long, Long> partitionVersionEntry : partitionMaxFileVersions.entrySet()) { long partitionId = partitionVersionEntry.getKey(); long maxVersion = partitionVersionEntry.getValue(); - lastTsFileSetMap.compute(partitionId, (pid, lastSet) -> { - if (lastSet == null) { - lastSet = createNewFileSet(maxVersion, partitionId); - } else if (lastSet.getEndVersion() < maxVersion) { - lastSet = createNewFileSet(maxVersion, partitionId); - } - try { - lastSet.appendSchemaEvolution(schemaEvolutions); - } catch (IOException e) { - logger.error("Cannot append schema evolutions to fileSets in partition {}-{}", dataRegionId, partitionId, e); - } - return lastSet; - }); + lastTsFileSetMap.compute( + partitionId, + (pid, lastSet) -> { + if (lastSet == null) { + lastSet = createNewFileSet(maxVersion, partitionId); + } else if (lastSet.getEndVersion() < maxVersion) { + lastSet = createNewFileSet(maxVersion, partitionId); + } + try { + lastSet.appendSchemaEvolution(schemaEvolutions); + } catch (IOException e) { + logger.error( + "Cannot append schema evolutions to fileSets in partition {}-{}", + dataRegionId, + partitionId, + e); + } + return lastSet; + }); } } public void applySchemaEvolutionToObjects(List<SchemaEvolution> schemaEvolutions) { + for (SchemaEvolution schemaEvolution : schemaEvolutions) { + if (schemaEvolution instanceof TableRename) { + TableRename tableRename = (TableRename) schemaEvolution; + renameTableForObjects(tableRename.getNameBefore(), tableRename.getNameAfter()); + } else if (schemaEvolution instanceof ColumnRename) { + ColumnRename columnRename = (ColumnRename) schemaEvolution; + if (columnRename.getDataType() == TSDataType.OBJECT) { + renameMeasurementForObjects(columnRename.getTableName(), columnRename.getNameBefore(), columnRename.getNameAfter()); + } + } + } + } + + private void renameTableForObjects(String nameBefore, String nameAfter) { + // TODO-SchemaEvolution + throw new UnsupportedOperationException(); + } + + private void renameMeasurementForObjects(String tableName, String nameBefore, String nameAfter) { // TODO-SchemaEvolution throw new UnsupportedOperationException(); } @@ -1686,7 +1709,7 @@ public class DataRegion implements IDataRegionForQuery { } List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>(); - for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : tsFileProcessorMap.entrySet()) { + for (Entry<TsFileProcessor, InsertRowsNode> entry : tsFileProcessorMap.entrySet()) { TsFileProcessor tsFileProcessor = entry.getKey(); InsertRowsNode subInsertRowsNode = entry.getValue(); try { @@ -2661,7 +2684,8 @@ public class DataRegion implements IDataRegionForQuery { deviceIdBackThen = evolvedSchema.rewriteDeviceId(singleDeviceId); } - if (!tsFileResource.isSatisfied(deviceIdBackThen, globalTimeFilter, isSeq, context.isDebug())) { + if (!tsFileResource.isSatisfied( + deviceIdBackThen, globalTimeFilter, isSeq, context.isDebug())) { continue; } try { @@ -2885,12 +2909,12 @@ public class DataRegion implements IDataRegionForQuery { for (TableDeletionEntry modEntry : deleteDataNode.getModEntries()) { long startTime = modEntry.getStartTime(); long endTime = modEntry.getEndTime(); - for (Map.Entry<Long, TsFileProcessor> entry : workSequenceTsFileProcessors.entrySet()) { + for (Entry<Long, TsFileProcessor> entry : workSequenceTsFileProcessors.entrySet()) { if (TimePartitionUtils.satisfyPartitionId(startTime, endTime, entry.getKey())) { involvedProcessors.add(entry.getValue()); } } - for (Map.Entry<Long, TsFileProcessor> entry : workUnsequenceTsFileProcessors.entrySet()) { + for (Entry<Long, TsFileProcessor> entry : workUnsequenceTsFileProcessors.entrySet()) { if (TimePartitionUtils.satisfyPartitionId(startTime, endTime, entry.getKey())) { involvedProcessors.add(entry.getValue()); } @@ -2926,13 +2950,13 @@ public class DataRegion implements IDataRegionForQuery { DeleteDataNode deleteDataNode = new DeleteDataNode(new PlanNodeId(""), Collections.singletonList(path), startTime, endTime); deleteDataNode.setSearchIndex(searchIndex); - for (Map.Entry<Long, TsFileProcessor> entry : workSequenceTsFileProcessors.entrySet()) { + for (Entry<Long, TsFileProcessor> entry : workSequenceTsFileProcessors.entrySet()) { if (TimePartitionUtils.satisfyPartitionId(startTime, endTime, entry.getKey())) { WALFlushListener walFlushListener = entry.getValue().logDeleteDataNodeInWAL(deleteDataNode); walFlushListeners.add(walFlushListener); } } - for (Map.Entry<Long, TsFileProcessor> entry : workUnsequenceTsFileProcessors.entrySet()) { + for (Entry<Long, TsFileProcessor> entry : workUnsequenceTsFileProcessors.entrySet()) { if (TimePartitionUtils.satisfyPartitionId(startTime, endTime, entry.getKey())) { WALFlushListener walFlushListener = entry.getValue().logDeleteDataNodeInWAL(deleteDataNode); walFlushListeners.add(walFlushListener); @@ -3060,7 +3084,7 @@ public class DataRegion implements IDataRegionForQuery { ITimeIndex timeIndex = sealedTsFile.getTimeIndex(); if ((timeIndex instanceof ArrayDeviceTimeIndex) - && (deletion.getType() == ModEntry.ModType.TABLE_DELETION)) { + && (deletion.getType() == ModType.TABLE_DELETION)) { ArrayDeviceTimeIndex deviceTimeIndex = (ArrayDeviceTimeIndex) timeIndex; Set<IDeviceID> devicesInFile = deviceTimeIndex.getDevices(); boolean onlyOneTable = false; @@ -4225,7 +4249,7 @@ public class DataRegion implements IDataRegionForQuery { // infoForMetrics[2]: ScheduleWalTimeCost // infoForMetrics[3]: ScheduleMemTableTimeCost // infoForMetrics[4]: InsertedPointsNumber - for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : tsFileProcessorMap.entrySet()) { + for (Entry<TsFileProcessor, InsertRowsNode> entry : tsFileProcessorMap.entrySet()) { TsFileProcessor tsFileProcessor = entry.getKey(); InsertRowsNode subInsertRowsNode = entry.getValue(); try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java index e7c13f149a9..3e72acaa34d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DeviceLastFlushTime.java @@ -19,10 +19,9 @@ package org.apache.iotdb.db.storageengine.dataregion; -import java.util.List; -import java.util.stream.Collectors; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.TableRename; + import org.apache.tsfile.file.metadata.IDeviceID; import java.util.HashMap; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java index cf8badd6d08..d02835015eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java @@ -19,20 +19,16 @@ package org.apache.iotdb.db.storageengine.dataregion; -import java.util.List; -import java.util.Map.Entry; -import java.util.stream.Collectors; import org.apache.iotdb.db.storageengine.StorageEngine; - -import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.TableRename; + import org.apache.tsfile.file.metadata.IDeviceID; -import org.apache.tsfile.file.metadata.IDeviceID.Factory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; public class HashLastFlushTimeMap implements ILastFlushTimeMap { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTime.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTime.java index f78b6822e90..9b685407326 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTime.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTime.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; + import org.apache.tsfile.file.metadata.IDeviceID; public interface ILastFlushTime { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java index 40186cef573..ca3a5e37ced 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/ILastFlushTimeMap.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; + import org.apache.tsfile.file.metadata.IDeviceID; import java.util.Map; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PartitionLastFlushTime.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PartitionLastFlushTime.java index 100d0238618..e37ce43b930 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PartitionLastFlushTime.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/PartitionLastFlushTime.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion; import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; + import org.apache.tsfile.file.metadata.IDeviceID; public class PartitionLastFlushTime implements ILastFlushTime { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTask.java index f727e2846e7..297518538dd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTask.java @@ -19,12 +19,14 @@ package org.apache.iotdb.db.storageengine.dataregion.task; -import java.io.IOException; -import java.io.InputStream; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.utils.io.StreamSerializable; + import org.apache.tsfile.utils.ReadWriteForEncodingUtils; +import java.io.IOException; +import java.io.InputStream; + public interface DataRegionTask extends Runnable, StreamSerializable { long getTaskId(); @@ -38,7 +40,8 @@ public interface DataRegionTask extends Runnable, StreamSerializable { } @SuppressWarnings("SwitchStatementWithTooFewBranches") - static DataRegionTask createFrom(InputStream stream, long taskId, DataRegion dataRegion) throws IOException { + static DataRegionTask createFrom(InputStream stream, long taskId, DataRegion dataRegion) + throws IOException { int typeOrdinal = ReadWriteForEncodingUtils.readVarInt(stream); if (typeOrdinal < 0 || typeOrdinal >= TaskType.values().length) { throw new IOException("Invalid task type: " + typeOrdinal); @@ -49,10 +52,10 @@ public interface DataRegionTask extends Runnable, StreamSerializable { DataRegionTask task; switch (taskType) { case SchemaEvolutionTask: - task = new SchemaEvolutionTask(dataRegion); - break; - default: - throw new IOException("Invalid task type: " + taskType); + task = new SchemaEvolutionTask(dataRegion); + break; + default: + throw new IOException("Invalid task type: " + taskType); } task.deserialize(stream); task.setTaskId(taskId); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTaskManager.java index 5441b9b19c7..59339aed007 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/DataRegionTaskManager.java @@ -19,6 +19,11 @@ package org.apache.iotdb.db.storageengine.dataregion.task; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; @@ -27,9 +32,6 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.Arrays; import java.util.concurrent.atomic.AtomicLong; -import org.apache.iotdb.db.storageengine.dataregion.DataRegion; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @SuppressWarnings("ResultOfMethodCallIgnored") public class DataRegionTaskManager { @@ -54,17 +56,19 @@ public class DataRegionTaskManager { return; } - Arrays.sort(files, (f1, f2) -> { - String fileName1 = f1.getName(); - int suffixIndex1 = fileName1.indexOf("."); - long taskId1 = Long.parseLong(fileName1.substring(0, suffixIndex1)); + Arrays.sort( + files, + (f1, f2) -> { + String fileName1 = f1.getName(); + int suffixIndex1 = fileName1.indexOf("."); + long taskId1 = Long.parseLong(fileName1.substring(0, suffixIndex1)); - String fileName2 = f2.getName(); - int suffixIndex2 = fileName2.indexOf("."); - long taskId2 = Long.parseLong(fileName1.substring(0, suffixIndex2)); + String fileName2 = f2.getName(); + int suffixIndex2 = fileName2.indexOf("."); + long taskId2 = Long.parseLong(fileName1.substring(0, suffixIndex2)); - return Long.compare(taskId1, taskId2); - }); + return Long.compare(taskId1, taskId2); + }); for (File file : files) { String fileName = file.getName(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/SchemaEvolutionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/SchemaEvolutionTask.java index e3bae4c571c..9a361ca4700 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/SchemaEvolutionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/task/SchemaEvolutionTask.java @@ -19,14 +19,16 @@ package org.apache.iotdb.db.storageengine.dataregion.task; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; + +import org.apache.tsfile.utils.ReadWriteForEncodingUtils; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.List; -import org.apache.iotdb.db.storageengine.dataregion.DataRegion; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.SchemaEvolution; -import org.apache.tsfile.utils.ReadWriteForEncodingUtils; public class SchemaEvolutionTask implements DataRegionTask { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java index 4986107a32d..80ff4b0f3ba 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/ColumnRename.java @@ -19,8 +19,6 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution; -import java.nio.ByteBuffer; -import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet.ColumnCategory; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.ReadWriteForEncodingUtils; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -28,6 +26,7 @@ import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; /** A schema evolution operation that renames a column in a table schema. */ public class ColumnRename implements SchemaEvolution { @@ -40,10 +39,11 @@ public class ColumnRename implements SchemaEvolution { // for deserialization public ColumnRename() {} - public ColumnRename(String tableName, String nameBefore, String nameAfter) { + public ColumnRename(String tableName, String nameBefore, String nameAfter, TSDataType dataType) { this.tableName = tableName.toLowerCase(); this.nameBefore = nameBefore.toLowerCase(); this.nameAfter = nameAfter.toLowerCase(); + this.dataType = dataType; } @Override @@ -72,7 +72,7 @@ public class ColumnRename implements SchemaEvolution { nameBefore = ReadWriteIOUtils.readVarIntString(stream); nameAfter = ReadWriteIOUtils.readVarIntString(stream); byte category = ReadWriteIOUtils.readByte(stream); - if (category != -1) { + if (category != -1) { dataType = TSDataType.values()[category]; } } @@ -93,7 +93,7 @@ public class ColumnRename implements SchemaEvolution { nameBefore = ReadWriteIOUtils.readVarIntString(buffer); nameAfter = ReadWriteIOUtils.readVarIntString(buffer); byte category = ReadWriteIOUtils.readByte(buffer); - if (category != -1) { + if (category != -1) { dataType = TSDataType.values()[category]; } } @@ -102,8 +102,19 @@ public class ColumnRename implements SchemaEvolution { return dataType; } - public void setDataType( - TSDataType dataType) { + public void setDataType(TSDataType dataType) { this.dataType = dataType; } + + public String getTableName() { + return tableName; + } + + public String getNameBefore() { + return nameBefore; + } + + public String getNameAfter() { + return nameAfter; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java index cd899ccc97c..8913d795eb3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java @@ -19,13 +19,13 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution; -import java.util.HashMap; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.IDeviceID.Factory; + import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; -import org.apache.tsfile.file.metadata.IDeviceID; -import org.apache.tsfile.file.metadata.IDeviceID.Factory; public class EvolvedSchema { // the evolved table names after applying all schema evolution operations @@ -95,10 +95,12 @@ public class EvolvedSchema { @Override public String toString() { - return "EvolvedSchema{" + - "originalTableNames=" + originalTableNames + - ", originalColumnNames=" + originalColumnNames + - '}'; + return "EvolvedSchema{" + + "originalTableNames=" + + originalTableNames + + ", originalColumnNames=" + + originalColumnNames + + '}'; } public IDeviceID rewriteDeviceId(IDeviceID deviceID) { @@ -128,32 +130,35 @@ public class EvolvedSchema { } public static EvolvedSchema merge(EvolvedSchema... schemas) { - EvolvedSchema firstNotNullSchema = null; - int i = 0; - for (; i < schemas.length; i++) { - if (schemas[i] != null) { - firstNotNullSchema = schemas[i]; - i++; - break; - } + EvolvedSchema firstNotNullSchema = null; + int i = 0; + for (; i < schemas.length; i++) { + if (schemas[i] != null) { + firstNotNullSchema = schemas[i]; + i++; + break; } + } - if (firstNotNullSchema == null) { - return null; - } - EvolvedSchema mergedSchema = deepCopy(firstNotNullSchema); + if (firstNotNullSchema == null) { + return null; + } + EvolvedSchema mergedSchema = deepCopy(firstNotNullSchema); for (; i < schemas.length; i++) { if (schemas[i] != null) { EvolvedSchema newSchema = schemas[i]; - for (Entry<String, String> finalOriginalTableName : newSchema.originalTableNames.entrySet()) { + for (Entry<String, String> finalOriginalTableName : + newSchema.originalTableNames.entrySet()) { if (!finalOriginalTableName.getValue().isEmpty()) { - mergedSchema.renameTable(finalOriginalTableName.getValue(), finalOriginalTableName.getKey()); + mergedSchema.renameTable( + finalOriginalTableName.getValue(), finalOriginalTableName.getKey()); } } - for (Entry<String, Map<String, String>> finalTableNameColumnNameMapEntry : newSchema.originalColumnNames.entrySet()) { - for (Entry<String, String> finalColNameOriginalColNameEntry : finalTableNameColumnNameMapEntry.getValue() - .entrySet()) { + for (Entry<String, Map<String, String>> finalTableNameColumnNameMapEntry : + newSchema.originalColumnNames.entrySet()) { + for (Entry<String, String> finalColNameOriginalColNameEntry : + finalTableNameColumnNameMapEntry.getValue().entrySet()) { if (!finalColNameOriginalColNameEntry.getValue().isEmpty()) { String finalTableName = finalTableNameColumnNameMapEntry.getKey(); String finalColName = finalColNameOriginalColNameEntry.getKey(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java index b017f6509b2..f1ac8edbcfa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolution.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution; -import java.nio.ByteBuffer; import org.apache.iotdb.db.utils.io.BufferSerializable; import org.apache.iotdb.db.utils.io.StreamSerializable; @@ -27,6 +26,9 @@ import org.apache.tsfile.utils.ReadWriteForEncodingUtils; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; /** A schema evolution operation that can be applied to a TableSchemaMap. */ public interface SchemaEvolution extends StreamSerializable, BufferSerializable { @@ -77,4 +79,13 @@ public interface SchemaEvolution extends StreamSerializable, BufferSerializable evolution.deserialize(buffer); return evolution; } + + static List<SchemaEvolution> createListFrom(ByteBuffer buffer) { + int size = ReadWriteForEncodingUtils.readVarInt(buffer); + List<SchemaEvolution> list = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + list.add(createFrom(buffer)); + } + return list; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java index e90c1ac32ca..1c4343cd154 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFile.java @@ -42,6 +42,7 @@ public class SchemaEvolutionFile { /** * Recover the SchemaEvolutionFile if it is broken. + * * @return true if the file exists false otherwise * @throws IOException if the file cannot be recovered */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java index dfc80974e85..a37557f45aa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/TableRename.java @@ -19,11 +19,6 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import org.apache.iotdb.db.queryengine.execution.schedule.queue.ID; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.IDeviceID.Factory; import org.apache.tsfile.utils.ReadWriteForEncodingUtils; @@ -32,6 +27,10 @@ import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; /** A schema evolution operation that renames a table in a schema map. */ public class TableRename implements SchemaEvolution { @@ -107,9 +106,10 @@ public class TableRename implements SchemaEvolution { } public <T> void rewriteMap(Map<IDeviceID, T> map) { - List<IDeviceID> affectedDeviceId = map.keySet().stream() - .filter(k -> k.getTableName().equals(getNameBefore())).collect( - Collectors.toList()); + List<IDeviceID> affectedDeviceId = + map.keySet().stream() + .filter(k -> k.getTableName().equals(getNameBefore())) + .collect(Collectors.toList()); for (IDeviceID deviceID : affectedDeviceId) { IDeviceID newDeviceId = rewriteDeviceId(deviceID); T removed = map.remove(deviceID); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java index 5c43418fa64..aaa7bb195ca 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/fileset/TsFileSet.java @@ -123,9 +123,6 @@ public class TsFileSet implements Comparable<TsFileSet> { @Override public String toString() { - return "TsFileSet{" + - "endVersion=" + endVersion + - ", fileSetDir=" + fileSetDir + - '}'; + return "TsFileSet{" + "endVersion=" + endVersion + ", fileSetDir=" + fileSetDir + '}'; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/io/IOUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/io/IOUtils.java index f41ae6de8de..3a1841190cb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/io/IOUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/io/IOUtils.java @@ -19,12 +19,12 @@ package org.apache.iotdb.db.utils.io; +import org.apache.tsfile.utils.ReadWriteForEncodingUtils; + import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; -import org.apache.tsfile.utils.ReadWriteForEncodingUtils; // TODO: move to TsFile public class IOUtils { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java index e896a599c43..014874e5893 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.storageengine.dataregion; -import java.nio.charset.StandardCharsets; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.DataRegionId; @@ -94,6 +93,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -1789,41 +1789,59 @@ public class DataRegionTest { throws IllegalPathException, WriteProcessException, QueryProcessException, IOException { String[] measurements = {"tag1", "s1", "s2"}; MeasurementSchema[] measurementSchemas = { - new MeasurementSchema("tag1", TSDataType.STRING), - new MeasurementSchema("s1", TSDataType.INT64), - new MeasurementSchema("s2", TSDataType.DOUBLE) + new MeasurementSchema("tag1", TSDataType.STRING), + new MeasurementSchema("s1", TSDataType.INT64), + new MeasurementSchema("s2", TSDataType.DOUBLE) }; - RelationalInsertRowNode insertRowNode = new RelationalInsertRowNode(new PlanNodeId(""), - new PartialPath(new String[] {"table1"}), - true, - measurements, - new TSDataType[]{TSDataType.STRING, TSDataType.INT64, TSDataType.DOUBLE}, - measurementSchemas, - 10, - new Object[]{new Binary("tag1".getBytes(StandardCharsets.UTF_8)), 1L, 1.0}, - false, - new TsTableColumnCategory[]{TsTableColumnCategory.TAG, TsTableColumnCategory.FIELD, TsTableColumnCategory.FIELD}); + RelationalInsertRowNode insertRowNode = + new RelationalInsertRowNode( + new PlanNodeId(""), + new PartialPath(new String[] {"table1"}), + true, + measurements, + new TSDataType[] {TSDataType.STRING, TSDataType.INT64, TSDataType.DOUBLE}, + measurementSchemas, + 10, + new Object[] {new Binary("tag1".getBytes(StandardCharsets.UTF_8)), 1L, 1.0}, + false, + new TsTableColumnCategory[] { + TsTableColumnCategory.TAG, TsTableColumnCategory.FIELD, TsTableColumnCategory.FIELD + }); dataRegion.insert(insertRowNode); // table1 -> table2 dataRegion.applySchemaEvolution(Collections.singletonList(new TableRename("table1", "table2"))); // cannot query with the old name - IDeviceID deviceID1 = Factory.DEFAULT_FACTORY.create(new String[]{"table1", "tag1"}); - List<IFullPath> fullPaths = Arrays.asList( - new AlignedFullPath(deviceID1, Arrays.asList(measurements), Arrays.asList(measurementSchemas)) - ); - QueryDataSource dataSource = dataRegion.query(fullPaths, deviceID1, new QueryContext(), null, - Collections.singletonList(0L), Long.MAX_VALUE); + IDeviceID deviceID1 = Factory.DEFAULT_FACTORY.create(new String[] {"table1", "tag1"}); + List<IFullPath> fullPaths = + Arrays.asList( + new AlignedFullPath( + deviceID1, Arrays.asList(measurements), Arrays.asList(measurementSchemas))); + QueryDataSource dataSource = + dataRegion.query( + fullPaths, + deviceID1, + new QueryContext(), + null, + Collections.singletonList(0L), + Long.MAX_VALUE); assertTrue(dataSource.getSeqResources().isEmpty()); // can query with the new name - IDeviceID deviceID2 = Factory.DEFAULT_FACTORY.create(new String[]{"table2", "tag1"}); - fullPaths = Arrays.asList( - new AlignedFullPath(deviceID2, Arrays.asList(measurements), Arrays.asList(measurementSchemas)) - ); - dataSource = dataRegion.query(fullPaths, deviceID2, new QueryContext(), null, - Collections.singletonList(0L), Long.MAX_VALUE); + IDeviceID deviceID2 = Factory.DEFAULT_FACTORY.create(new String[] {"table2", "tag1"}); + fullPaths = + Arrays.asList( + new AlignedFullPath( + deviceID2, Arrays.asList(measurements), Arrays.asList(measurementSchemas))); + dataSource = + dataRegion.query( + fullPaths, + deviceID2, + new QueryContext(), + null, + Collections.singletonList(0L), + Long.MAX_VALUE); assertEquals(1, dataSource.getSeqResources().size()); DataNodeTableCache.getInstance() @@ -1832,32 +1850,50 @@ public class DataRegionTest { .commitUpdateTable(dataRegion.getDatabaseName(), StatementTestUtils.tableName(1), null); // write again with table1 - insertRowNode = new RelationalInsertRowNode(new PlanNodeId(""), - new PartialPath(new String[] {"table1"}), - true, - measurements, - new TSDataType[]{TSDataType.STRING, TSDataType.INT64, TSDataType.DOUBLE}, - measurementSchemas, - 10, - new Object[]{new Binary("tag1".getBytes(StandardCharsets.UTF_8)), 1L, 1.0}, - false, - new TsTableColumnCategory[]{TsTableColumnCategory.TAG, TsTableColumnCategory.FIELD, TsTableColumnCategory.FIELD}); + insertRowNode = + new RelationalInsertRowNode( + new PlanNodeId(""), + new PartialPath(new String[] {"table1"}), + true, + measurements, + new TSDataType[] {TSDataType.STRING, TSDataType.INT64, TSDataType.DOUBLE}, + measurementSchemas, + 10, + new Object[] {new Binary("tag1".getBytes(StandardCharsets.UTF_8)), 1L, 1.0}, + false, + new TsTableColumnCategory[] { + TsTableColumnCategory.TAG, TsTableColumnCategory.FIELD, TsTableColumnCategory.FIELD + }); dataRegion.insert(insertRowNode); // can query with table1 - fullPaths = Arrays.asList( - new AlignedFullPath(deviceID1, Arrays.asList(measurements), Arrays.asList(measurementSchemas)) - ); - dataSource = dataRegion.query(fullPaths, deviceID1, new QueryContext(), null, - Collections.singletonList(0L), Long.MAX_VALUE); + fullPaths = + Arrays.asList( + new AlignedFullPath( + deviceID1, Arrays.asList(measurements), Arrays.asList(measurementSchemas))); + dataSource = + dataRegion.query( + fullPaths, + deviceID1, + new QueryContext(), + null, + Collections.singletonList(0L), + Long.MAX_VALUE); assertEquals(1, dataSource.getUnseqResources().size()); // can query with table2 - fullPaths = Arrays.asList( - new AlignedFullPath(deviceID2, Arrays.asList(measurements), Arrays.asList(measurementSchemas)) - ); - dataSource = dataRegion.query(fullPaths, deviceID2, new QueryContext(), null, - Collections.singletonList(0L), Long.MAX_VALUE); + fullPaths = + Arrays.asList( + new AlignedFullPath( + deviceID2, Arrays.asList(measurements), Arrays.asList(measurementSchemas))); + dataSource = + dataRegion.query( + fullPaths, + deviceID2, + new QueryContext(), + null, + Collections.singletonList(0L), + Long.MAX_VALUE); assertEquals(1, dataSource.getSeqResources().size()); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchemaTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchemaTest.java index d9c76be4710..5f4f7e8280d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchemaTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchemaTest.java @@ -19,11 +19,13 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution; -import static org.junit.Assert.assertEquals; +import org.apache.tsfile.enums.TSDataType; +import org.junit.Test; import java.util.Arrays; import java.util.List; -import org.junit.Test; + +import static org.junit.Assert.assertEquals; public class EvolvedSchemaTest { @@ -33,7 +35,7 @@ public class EvolvedSchemaTest { List<SchemaEvolution> schemaEvolutionList = Arrays.asList( new TableRename("t1", "t2"), - new ColumnRename("t2", "s1", "s2"), + new ColumnRename("t2", "s1", "s2", TSDataType.INT32), new TableRename("t3", "t1")); EvolvedSchema oldSchema = new EvolvedSchema(); EvolvedSchema allSchema = new EvolvedSchema(); @@ -44,7 +46,7 @@ public class EvolvedSchemaTest { schemaEvolutionList = Arrays.asList( new TableRename("t2", "t3"), - new ColumnRename("t3", "s2", "s1"), + new ColumnRename("t3", "s2", "s1", TSDataType.INT32), new TableRename("t1", "t2")); EvolvedSchema newSchema = new EvolvedSchema(); schemaEvolutionList.forEach(schemaEvolution -> schemaEvolution.applyTo(newSchema)); @@ -54,4 +56,4 @@ public class EvolvedSchemaTest { assertEquals(allSchema, mergedShema); } -} \ No newline at end of file +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFileTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFileTest.java index d4386dd8172..10348a92c17 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFileTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/SchemaEvolutionFileTest.java @@ -19,13 +19,14 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution; -import java.io.FileOutputStream; import org.apache.iotdb.db.utils.constant.TestConstant; +import org.apache.tsfile.enums.TSDataType; import org.junit.After; import org.junit.Test; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.util.Arrays; import java.util.List; @@ -51,7 +52,7 @@ public class SchemaEvolutionFileTest { List<SchemaEvolution> schemaEvolutionList = Arrays.asList( new TableRename("t1", "t2"), - new ColumnRename("t2", "s1", "s2"), + new ColumnRename("t2", "s1", "s2", TSDataType.INT32), new TableRename("t3", "t1")); schemaEvolutionFile.append(schemaEvolutionList); @@ -67,7 +68,7 @@ public class SchemaEvolutionFileTest { schemaEvolutionList = Arrays.asList( new TableRename("t2", "t3"), - new ColumnRename("t3", "s2", "s1"), + new ColumnRename("t3", "s2", "s1", TSDataType.INT32), new TableRename("t1", "t2")); schemaEvolutionFile.append(schemaEvolutionList); evolvedSchema = schemaEvolutionFile.readAsSchema(); @@ -97,7 +98,7 @@ public class SchemaEvolutionFileTest { List<SchemaEvolution> schemaEvolutionList = Arrays.asList( new TableRename("t1", "t2"), - new ColumnRename("t2", "s1", "s2"), + new ColumnRename("t2", "s1", "s2", TSDataType.INT32), new TableRename("t3", "t1")); schemaEvolutionFile.append(schemaEvolutionList); @@ -111,7 +112,7 @@ public class SchemaEvolutionFileTest { fileOutputStream.write(new byte[100]); } - schemaEvolutionFile = new SchemaEvolutionFile(files[0].getAbsolutePath()); + schemaEvolutionFile = new SchemaEvolutionFile(files[0].getAbsolutePath()); EvolvedSchema evolvedSchema = schemaEvolutionFile.readAsSchema(); assertEquals("t1", evolvedSchema.getOriginalTableName("t2")); assertEquals("s1", evolvedSchema.getOriginalColumnName("t2", "s2")); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java index a2510454e73..baed8bcd537 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java @@ -19,12 +19,6 @@ package org.apache.iotdb.commons.utils; -import java.util.HashSet; -import java.util.Objects; -import java.util.Random; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import org.apache.iotdb.commons.file.SystemFileFactory; import org.apache.tsfile.external.commons.codec.digest.DigestUtils; @@ -340,81 +334,6 @@ public class FileUtils { return true; } - public static <T> List<T> applyReversedIndexesOnListV2( - final List<Integer> filteredIndexes, final List<T> originalList) { - // filteredIndexes.sort(null); if necessary - List<T> filteredList = new ArrayList<>(originalList.size() - filteredIndexes.size()); - int filteredIndexPos = 0; - int processingIndex = 0; - for (; processingIndex < originalList.size(); processingIndex++) { - if (filteredIndexPos >= filteredIndexes.size()) { - // all filteredIndexes processed, add remaining to the filteredList - filteredList.addAll(originalList.subList(processingIndex, originalList.size())); - break; - } else { - int filteredIndex = filteredIndexes.get(filteredIndexPos); - if (filteredIndex == processingIndex) { - // the index is filtered, move to the next filtered pos - filteredIndexPos ++; - } else { - // the index is not filtered, add to the filteredList - filteredList.add(originalList.get(processingIndex)); - } - } - } - return filteredList; - } - - public static <T> List<T> applyReversedIndexesOnListV1( - final List<Integer> filteredIndexes, final List<T> originalList) { - final Set<Integer> indexes = new HashSet<>(filteredIndexes); - return Objects.nonNull(originalList) - ? IntStream.range(0, originalList.size()) - .filter(index -> !indexes.contains(index)) // 保留不在排除列表中的下标 - .mapToObj(originalList::get) - .collect(Collectors.toList()) - : null; - } - - public static void main(String[] args) { - int elementNum = 10_000_000; - int filteredNum = elementNum / 10; - Random random = new Random(); - List<Integer> originalList = IntStream.range(0, elementNum).boxed().collect(Collectors.toList()); - List<Integer> filteredIndexes = new ArrayList<>(filteredNum); - for (int i = 0; i < filteredNum; i++) { - filteredIndexes.add(random.nextInt(elementNum)); - } - filteredIndexes = filteredIndexes.stream().sorted().distinct().collect(Collectors.toList()); - - long start = System.currentTimeMillis(); - List<Integer> appliedList = applyReversedIndexesOnListV1(filteredIndexes, originalList); - System.out.println(System.currentTimeMillis() - start); - Set<Integer> appliedSet = new HashSet<>(appliedList); - for (Integer filteredIndex : filteredIndexes) { - if (appliedSet.contains(filteredIndex)) { - System.out.println("Incorrect implementation"); - System.exit(-1); - } - } - - - start = System.currentTimeMillis(); - appliedList = WapplyReversedIndexesOnListV2(filteredIndexes, originalList); - System.out.println(System.currentTimeMillis() - start); - appliedSet = new HashSet<>(appliedList); - if (appliedList.size() != originalList.size() - filteredIndexes.size()) { - System.out.println("Incorrect implementation"); - System.exit(-1); - } - for (Integer filteredIndex : filteredIndexes) { - if (appliedSet.contains(filteredIndex)) { - System.out.println("Incorrect implementation"); - System.exit(-1); - } - } - } - public static File createHardLink(File sourceFile, File hardlink) throws IOException { if (!hardlink.getParentFile().exists() && !hardlink.getParentFile().mkdirs()) { synchronized (FileUtils.class) { diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index caaf44c16a7..fd4f15c1e1a 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -459,6 +459,12 @@ struct TDeleteDataForDeleteSchemaReq { 3: optional bool isGeneratedByPipe } +struct TDataRegionEvolveSchemaReq { + 1: required list<common.TConsensusGroupId> dataRegionIdList + 2: required binary schemaEvolutions + 3: optional bool isGeneratedByPipe +} + struct TDeleteTimeSeriesReq { 1: required list<common.TConsensusGroupId> schemaRegionIdList 2: required binary pathPatternTree @@ -1076,6 +1082,8 @@ service IDataNodeRPCService { */ common.TSStatus deleteDataForDeleteSchema(TDeleteDataForDeleteSchemaReq req) + common.TSStatus evolveSchemaInDataRegion(TDataRegionEvolveSchemaReq req) + /** * Delete matched timeseries and remove according schema black list in target schemRegion */
