This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch copy_to in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0fa94cd1b21ab62858c4eca3ff140e6f64084210 Author: shuwenwei <[email protected]> AuthorDate: Fri Mar 27 14:30:12 2026 +0800 copy to tsfile --- .../query/recent/copyto/IoTDBCopyToTsFileIT.java | 407 +++++++++++++++++++++ .../queryengine/common/header/DatasetHeader.java | 7 +- .../operator/process/copyto/CopyToOptions.java | 87 +++++ .../process/copyto/IFormatCopyToWriter.java | 34 ++ .../process/copyto/TableCopyToOperator.java | 194 ++++++++++ .../process/copyto/tsfile/CopyToTsFileOptions.java | 202 ++++++++++ .../copyto/tsfile/TsFileFormatCopyToWriter.java | 197 ++++++++++ .../plan/planner/TableOperatorGenerator.java | 28 ++ .../plan/planner/plan/node/PlanVisitor.java | 5 + .../plan/relational/analyzer/Analysis.java | 11 + .../relational/analyzer/StatementAnalyzer.java | 8 + .../relational/planner/TableLogicalPlanner.java | 56 ++- .../distribute/TableDistributedPlanGenerator.java | 8 + .../TableModelTypeProviderExtractor.java | 7 + .../planner/iterative/rule/PruneCopyToColumns.java | 47 +++ .../plan/relational/planner/node/CopyToNode.java | 135 +++++++ .../plan/relational/planner/node/Patterns.java | 4 + .../optimizations/LogicalOptimizeFactory.java | 2 + .../optimizations/UnaliasSymbolReferences.java | 19 + .../plan/relational/sql/ast/AstVisitor.java | 4 + .../plan/relational/sql/ast/CopyTo.java | 107 ++++++ .../sql/ast/DefaultTraversalVisitor.java | 6 + .../plan/relational/sql/parser/AstBuilder.java | 62 ++++ .../plan/relational/sql/util/SqlFormatter.java | 14 + .../db/storageengine/rescon/disk/TierManager.java | 25 ++ .../apache/iotdb/commons/conf/IoTDBConstant.java | 1 + .../schema/column/ColumnHeaderConstant.java | 15 + .../apache/iotdb/commons/schema/table/TsTable.java | 14 + .../db/relational/grammar/sql/RelationalSql.g4 | 30 +- 29 files changed, 1731 insertions(+), 5 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/copyto/IoTDBCopyToTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/copyto/IoTDBCopyToTsFileIT.java new file mode 100644 index 00000000000..8a3cbc440ed --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/copyto/IoTDBCopyToTsFileIT.java @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.query.recent.copyto; + +import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; + +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; +import org.apache.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBCopyToTsFileIT { + + private static final String DATABASE_NAME = "test_db"; + + protected static final String[] createSqls = + new String[] { + "CREATE DATABASE " + DATABASE_NAME, + "USE " + DATABASE_NAME, + "create table table1(tag1 string tag, tag2 string tag, s1 int32 field, s2 int32 field)", + "insert into table1(time, tag1, tag2, s1, s2) values (1, 't1_1', 't2', 1, 1)", + "insert into table1(time, tag1, tag2, s1, s2) values (2, 't1_1', 't2', 2, 2)", + "insert into table1(time, tag1, tag2, s1, s2) values (3, 't1_1', 't2', 3, 3)", + "insert into table1(time, tag1, tag2, s1, s2) values (1, 't1_2', 't2', 1, 1)", + "insert into table1(time, tag1, tag2, s1, s2) values (2, 't1_2', 't2', 2, 2)", + "insert into table1(time, tag1, tag2, s1, s2) values (3, 't1_2', 't2', 3, 3)", + "create table table2(tag1 string tag, tag2 string tag, s1 int32 field, s2 int32 field)", + "insert into table2(time, tag1, tag2, s1, s2) values (1, 't1_1', 't2', 1, 1)", + }; + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(); + prepareTableData(createSqls); + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testCopyTable() throws IoTDBConnectionException, StatementExecutionException, IOException { + try (ITableSession session = + EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE_NAME)) { + SessionDataSet sessionDataSet = session.executeQueryStatement("copy table1 to '1.tsfile'"); + SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); + while (iterator.next()) { + String path = iterator.getString(1); + int rowCount = iterator.getInt(2); + int deviceCount = iterator.getInt(3); + long sizeInBytes = iterator.getLong(4); + String tableName = iterator.getString(5); + String timeColumn = iterator.getString(6); + String tagColumns = iterator.getString(7); + + Assert.assertTrue(new File(path).exists()); + Assert.assertEquals(6, rowCount); + Assert.assertEquals(2, deviceCount); + Assert.assertTrue(sizeInBytes > 0); + Assert.assertEquals("table1", tableName); + Assert.assertEquals("time", timeColumn); + Assert.assertEquals("[tag1, tag2]", tagColumns); + + try (TsFileSequenceReader reader = new TsFileSequenceReader(path)) { + Map<IDeviceID, List<TimeseriesMetadata>> allTimeseriesMetadata = + reader.getAllTimeseriesMetadata(false); + Assert.assertEquals(2, allTimeseriesMetadata.size()); + List<TimeseriesMetadata> timeseriesMetadataList = + allTimeseriesMetadata.get(new StringArrayDeviceID("table1", "t1_1", "t2")); + Assert.assertEquals(3, timeseriesMetadataList.size()); + Assert.assertEquals(1, timeseriesMetadataList.get(0).getStatistics().getStartTime()); + Assert.assertEquals(3, timeseriesMetadataList.get(0).getStatistics().getEndTime()); + timeseriesMetadataList = + allTimeseriesMetadata.get(new StringArrayDeviceID("table1", "t1_2", "t2")); + Assert.assertEquals(3, timeseriesMetadataList.size()); + Assert.assertEquals(1, timeseriesMetadataList.get(0).getStatistics().getStartTime()); + Assert.assertEquals(3, timeseriesMetadataList.get(0).getStatistics().getEndTime()); + } + } + } + } + + @Test + public void testCopySelectAllColumns() throws IoTDBConnectionException, StatementExecutionException, IOException { + try (ITableSession session = + EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE_NAME)) { + SessionDataSet sessionDataSet = session.executeQueryStatement("copy (select * from table1) to '2.tsfile'"); + SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); + while (iterator.next()) { + String path = iterator.getString(1); + int rowCount = iterator.getInt(2); + int deviceCount = iterator.getInt(3); + long sizeInBytes = iterator.getLong(4); + String tableName = iterator.getString(5); + String timeColumn = iterator.getString(6); + String tagColumns = iterator.getString(7); + + Assert.assertTrue(new File(path).exists()); + Assert.assertEquals(6, rowCount); + Assert.assertEquals(2, deviceCount); + Assert.assertTrue(sizeInBytes > 0); + Assert.assertEquals("table1", tableName); + Assert.assertEquals("time", timeColumn); + Assert.assertEquals("[tag1, tag2]", tagColumns); + + try (TsFileSequenceReader reader = new TsFileSequenceReader(path)) { + Map<IDeviceID, List<TimeseriesMetadata>> allTimeseriesMetadata = + reader.getAllTimeseriesMetadata(false); + Assert.assertEquals(2, allTimeseriesMetadata.size()); + List<TimeseriesMetadata> timeseriesMetadataList = + allTimeseriesMetadata.get(new StringArrayDeviceID("table1", "t1_1", "t2")); + Assert.assertEquals(3, timeseriesMetadataList.size()); + Assert.assertEquals(1, timeseriesMetadataList.get(0).getStatistics().getStartTime()); + Assert.assertEquals(3, timeseriesMetadataList.get(0).getStatistics().getEndTime()); + timeseriesMetadataList = + allTimeseriesMetadata.get(new StringArrayDeviceID("table1", "t1_2", "t2")); + Assert.assertEquals(3, timeseriesMetadataList.size()); + Assert.assertEquals(1, timeseriesMetadataList.get(0).getStatistics().getStartTime()); + Assert.assertEquals(3, timeseriesMetadataList.get(0).getStatistics().getEndTime()); + } + } + } + } + + @Test + public void testCopySelectSpecifiedColumns() throws IoTDBConnectionException, StatementExecutionException, IOException { + try (ITableSession session = + EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE_NAME)) { + SessionDataSet sessionDataSet = session.executeQueryStatement("copy table1(time,tag2,tag1,s1) to '3.tsfile'"); + SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); + while (iterator.next()) { + String path = iterator.getString(1); + int rowCount = iterator.getInt(2); + int deviceCount = iterator.getInt(3); + long sizeInBytes = iterator.getLong(4); + String tableName = iterator.getString(5); + String timeColumn = iterator.getString(6); + String tagColumns = iterator.getString(7); + + Assert.assertTrue(new File(path).exists()); + Assert.assertEquals(6, rowCount); + Assert.assertEquals(2, deviceCount); + Assert.assertTrue(sizeInBytes > 0); + Assert.assertEquals("table1", tableName); + Assert.assertEquals("time", timeColumn); + Assert.assertEquals("[tag1, tag2]", tagColumns); + + try (TsFileSequenceReader reader = new TsFileSequenceReader(path)) { + Map<IDeviceID, List<TimeseriesMetadata>> allTimeseriesMetadata = + reader.getAllTimeseriesMetadata(false); + Assert.assertEquals(2, allTimeseriesMetadata.size()); + List<TimeseriesMetadata> timeseriesMetadataList = + allTimeseriesMetadata.get(new StringArrayDeviceID("table1", "t1_1", "t2")); + Assert.assertEquals(2, timeseriesMetadataList.size()); + Assert.assertEquals(1, timeseriesMetadataList.get(0).getStatistics().getStartTime()); + Assert.assertEquals(3, timeseriesMetadataList.get(0).getStatistics().getEndTime()); + timeseriesMetadataList = + allTimeseriesMetadata.get(new StringArrayDeviceID("table1", "t1_2", "t2")); + Assert.assertEquals(2, timeseriesMetadataList.size()); + Assert.assertEquals(1, timeseriesMetadataList.get(0).getStatistics().getStartTime()); + Assert.assertEquals(3, timeseriesMetadataList.get(0).getStatistics().getEndTime()); + } + } + } + } + + @Test + public void testCopyWithSpecifiedTag() throws IoTDBConnectionException, StatementExecutionException, IOException { + try (ITableSession session = + EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE_NAME)) { + SessionDataSet sessionDataSet = session.executeQueryStatement("copy table1(time,tag1,tag2,s1) to '4.tsfile' with (tags(tag2,tag1))"); + SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); + while (iterator.next()) { + String path = iterator.getString(1); + int rowCount = iterator.getInt(2); + int deviceCount = iterator.getInt(3); + long sizeInBytes = iterator.getLong(4); + String tableName = iterator.getString(5); + String timeColumn = iterator.getString(6); + String tagColumns = iterator.getString(7); + + Assert.assertTrue(new File(path).exists()); + Assert.assertEquals(6, rowCount); + Assert.assertEquals(2, deviceCount); + Assert.assertTrue(sizeInBytes > 0); + Assert.assertEquals("table1", tableName); + Assert.assertEquals("time", timeColumn); + Assert.assertEquals("[tag2, tag1]", tagColumns); + + try (TsFileSequenceReader reader = new TsFileSequenceReader(path)) { + Map<IDeviceID, List<TimeseriesMetadata>> allTimeseriesMetadata = + reader.getAllTimeseriesMetadata(false); + Assert.assertEquals(2, allTimeseriesMetadata.size()); + List<TimeseriesMetadata> timeseriesMetadataList = + allTimeseriesMetadata.get(new StringArrayDeviceID("table1", "t2", "t1_1")); + Assert.assertEquals(2, timeseriesMetadataList.size()); + Assert.assertEquals(1, timeseriesMetadataList.get(0).getStatistics().getStartTime()); + Assert.assertEquals(3, timeseriesMetadataList.get(0).getStatistics().getEndTime()); + timeseriesMetadataList = + allTimeseriesMetadata.get(new StringArrayDeviceID("table1", "t2", "t1_2")); + Assert.assertEquals(2, timeseriesMetadataList.size()); + Assert.assertEquals(1, timeseriesMetadataList.get(0).getStatistics().getStartTime()); + Assert.assertEquals(3, timeseriesMetadataList.get(0).getStatistics().getEndTime()); + } + } + } + } + + @Test + public void testCopyWithSpecifiedTagAndTime() throws IoTDBConnectionException, StatementExecutionException, IOException { + try (ITableSession session = + EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE_NAME)) { + SessionDataSet sessionDataSet = session.executeQueryStatement("copy (select time as t, s1, tag2 as renamed_tag2, tag1 as renamed_tag1 from table1) to '6.tsfile' with (TIME t, tags(renamed_tag1,renamed_tag2))"); + SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); + while (iterator.next()) { + String path = iterator.getString(1); + int rowCount = iterator.getInt(2); + int deviceCount = iterator.getInt(3); + long sizeInBytes = iterator.getLong(4); + String tableName = iterator.getString(5); + String timeColumn = iterator.getString(6); + String tagColumns = iterator.getString(7); + + Assert.assertTrue(new File(path).exists()); + Assert.assertEquals(6, rowCount); + Assert.assertEquals(2, deviceCount); + Assert.assertTrue(sizeInBytes > 0); + Assert.assertEquals("table1", tableName); + Assert.assertEquals("t", timeColumn); + Assert.assertEquals("[renamed_tag1, renamed_tag2]", tagColumns); + + try (TsFileSequenceReader reader = new TsFileSequenceReader(path)) { + Map<IDeviceID, List<TimeseriesMetadata>> allTimeseriesMetadata = + reader.getAllTimeseriesMetadata(false); + Assert.assertEquals(2, allTimeseriesMetadata.size()); + List<TimeseriesMetadata> timeseriesMetadataList = + allTimeseriesMetadata.get(new StringArrayDeviceID("table1", "t1_1", "t2")); + Assert.assertEquals(2, timeseriesMetadataList.size()); + Assert.assertEquals(1, timeseriesMetadataList.get(0).getStatistics().getStartTime()); + Assert.assertEquals(3, timeseriesMetadataList.get(0).getStatistics().getEndTime()); + timeseriesMetadataList = + allTimeseriesMetadata.get(new StringArrayDeviceID("table1", "t1_2", "t2")); + Assert.assertEquals(2, timeseriesMetadataList.size()); + Assert.assertEquals(1, timeseriesMetadataList.get(0).getStatistics().getStartTime()); + Assert.assertEquals(3, timeseriesMetadataList.get(0).getStatistics().getEndTime()); + } + } + } + } + + @Test + public void testAutoGenerateTimeColumn() throws IoTDBConnectionException, StatementExecutionException, IOException { + try (ITableSession session = + EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE_NAME)) { + SessionDataSet sessionDataSet = session.executeQueryStatement("copy (select time as t, tag1, tag2, s1, s2 from table1) to '7.tsfile'"); + SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); + while (iterator.next()) { + String path = iterator.getString(1); + int rowCount = iterator.getInt(2); + int deviceCount = iterator.getInt(3); + long sizeInBytes = iterator.getLong(4); + String tableName = iterator.getString(5); + String timeColumn = iterator.getString(6); + String tagColumns = iterator.getString(7); + + Assert.assertTrue(new File(path).exists()); + Assert.assertEquals(6, rowCount); + Assert.assertEquals(2, deviceCount); + Assert.assertTrue(sizeInBytes > 0); + Assert.assertEquals("table1", tableName); + Assert.assertEquals("time(auto_gen)", timeColumn); + Assert.assertEquals("[tag1, tag2]", tagColumns); + + try (TsFileSequenceReader reader = new TsFileSequenceReader(path)) { + Map<IDeviceID, List<TimeseriesMetadata>> allTimeseriesMetadata = + reader.getAllTimeseriesMetadata(false); + Assert.assertEquals(2, allTimeseriesMetadata.size()); + List<TimeseriesMetadata> timeseriesMetadataList = + allTimeseriesMetadata.get(new StringArrayDeviceID("table1", "t1_1", "t2")); + Assert.assertEquals(4, timeseriesMetadataList.size()); + Assert.assertEquals(0, timeseriesMetadataList.get(0).getStatistics().getStartTime()); + Assert.assertEquals(2, timeseriesMetadataList.get(0).getStatistics().getEndTime()); + timeseriesMetadataList = + allTimeseriesMetadata.get(new StringArrayDeviceID("table1", "t1_2", "t2")); + Assert.assertEquals(4, timeseriesMetadataList.size()); + Assert.assertEquals(0, timeseriesMetadataList.get(0).getStatistics().getStartTime()); + Assert.assertEquals(2, timeseriesMetadataList.get(0).getStatistics().getEndTime()); + } + } + } + } + + @Test + public void testAutoGenerateTimeColumnWithoutTag() throws IoTDBConnectionException, StatementExecutionException, IOException { + try (ITableSession session = + EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE_NAME)) { + SessionDataSet sessionDataSet = session.executeQueryStatement("copy (select time as t, tag1 as tag1_field, tag2 as tag2_field, s1, s2 from table1) to '8.tsfile'"); + SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); + while (iterator.next()) { + String path = iterator.getString(1); + int rowCount = iterator.getInt(2); + int deviceCount = iterator.getInt(3); + long sizeInBytes = iterator.getLong(4); + String tableName = iterator.getString(5); + String timeColumn = iterator.getString(6); + String tagColumns = iterator.getString(7); + + Assert.assertTrue(new File(path).exists()); + Assert.assertEquals(6, rowCount); + Assert.assertEquals(1, deviceCount); + Assert.assertTrue(sizeInBytes > 0); + Assert.assertEquals("table1", tableName); + Assert.assertEquals("time(auto_gen)", timeColumn); + Assert.assertEquals("[]", tagColumns); + + try (TsFileSequenceReader reader = new TsFileSequenceReader(path)) { + Map<IDeviceID, List<TimeseriesMetadata>> allTimeseriesMetadata = + reader.getAllTimeseriesMetadata(false); + Assert.assertEquals(1, allTimeseriesMetadata.size()); + List<TimeseriesMetadata> timeseriesMetadataList = + allTimeseriesMetadata.get(new StringArrayDeviceID("table1")); + Assert.assertEquals(6, timeseriesMetadataList.size()); + Assert.assertEquals(0, timeseriesMetadataList.get(0).getStatistics().getStartTime()); + Assert.assertEquals(5, timeseriesMetadataList.get(0).getStatistics().getEndTime()); + } + } + } + } + + @Test + public void testMultiTable() throws IoTDBConnectionException, StatementExecutionException, IOException { + try (ITableSession session = + EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE_NAME)) { + + SessionDataSet sessionDataSet = session.executeQueryStatement("copy (select table1.time as time1, table1.tag1 as tag1_1, table1.tag2 as tag2_1, table1.s1 as s1_1, table1.s2 as s2_1, table2.s1 as s1_2, table2.s2 as s2_2 from table1 inner join table2 on table1.time = table2.time) to '9.tsfile'"); + SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); + while (iterator.next()) { + String path = iterator.getString(1); + int rowCount = iterator.getInt(2); + int deviceCount = iterator.getInt(3); + long sizeInBytes = iterator.getLong(4); + String tableName = iterator.getString(5); + String timeColumn = iterator.getString(6); + String tagColumns = iterator.getString(7); + + Assert.assertTrue(new File(path).exists()); + Assert.assertEquals(2, rowCount); + Assert.assertEquals(1, deviceCount); + Assert.assertTrue(sizeInBytes > 0); + Assert.assertEquals("default", tableName); + Assert.assertEquals("time(auto_gen)", timeColumn); + Assert.assertEquals("[]", tagColumns); + + try (TsFileSequenceReader reader = new TsFileSequenceReader(path)) { + Map<IDeviceID, List<TimeseriesMetadata>> allTimeseriesMetadata = + reader.getAllTimeseriesMetadata(false); + Assert.assertEquals(1, allTimeseriesMetadata.size()); + List<TimeseriesMetadata> timeseriesMetadataList = + allTimeseriesMetadata.get(new StringArrayDeviceID("default")); + Assert.assertEquals(8, timeseriesMetadataList.size()); + Assert.assertEquals(0, timeseriesMetadataList.get(0).getStatistics().getStartTime()); + Assert.assertEquals(1, timeseriesMetadataList.get(0).getStatistics().getEndTime()); + } + } + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeader.java index a72c4d00bef..e5b68ea78a2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeader.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.common.header; import org.apache.iotdb.commons.schema.column.ColumnHeader; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; @@ -90,7 +91,11 @@ public class DatasetHeader { } public void setTableColumnToTsBlockIndexMap(OutputNode outputNode) { - List<Symbol> childOutputSymbols = outputNode.getChild().getOutputSymbols(); + setTableColumnToTsBlockIndexMap(outputNode, outputNode.getChild()); + } + + public void setTableColumnToTsBlockIndexMap(OutputNode outputNode, PlanNode outputNodeChild) { + List<Symbol> childOutputSymbols = outputNodeChild.getOutputSymbols(); Map<Symbol, Integer> outputSymbolsIndexMap = new HashMap<>(childOutputSymbols.size()); for (int i = 0; i < childOutputSymbols.size(); i++) { outputSymbolsIndexMap.put(childOutputSymbols.get(i), i); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/CopyToOptions.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/CopyToOptions.java new file mode 100644 index 00000000000..4e2d356bc4d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/CopyToOptions.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.copyto; + +import org.apache.iotdb.commons.schema.column.ColumnHeader; +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.tsfile.CopyToTsFileOptions; +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; +import org.apache.iotdb.db.queryengine.plan.relational.planner.RelationPlan; + +import java.util.List; +import java.util.Set; + +public interface CopyToOptions { + + void infer(Analysis analysis, RelationPlan queryRelationPlan, List<ColumnHeader> columnHeaders); + + void check(List<ColumnHeader> columnHeaders); + + List<ColumnHeader> getRespColumnHeaders(); + + Format getFormat(); + + long estimatedMaxRamBytesInWrite(); + + enum Format { + TSFILE, + } + + class Builder { + private CopyToOptions.Format format = CopyToOptions.Format.TSFILE; + private String targetTableName = null; + private String targetTimeColumn = null; + private Set<String> targetTagColumns = null; + private long memoryThreshold = 0; + + public Builder withFormat(CopyToOptions.Format format) { + this.format = format; + return this; + } + + public Builder withTargetTableName(String targetTableName) { + this.targetTableName = targetTableName; + return this; + } + + public Builder withTargetTimeColumn(String targetTimeColumn) { + this.targetTimeColumn = targetTimeColumn; + return this; + } + + public Builder withTargetTagColumns(Set<String> targetTagColumns) { + this.targetTagColumns = targetTagColumns; + return this; + } + + public Builder withMemoryThreshold(long memoryThreshold) { + this.memoryThreshold = memoryThreshold; + return this; + } + + public CopyToOptions build() { + switch (format) { + case TSFILE: + default: + return new CopyToTsFileOptions( + targetTableName, targetTimeColumn, targetTagColumns, memoryThreshold); + } + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/IFormatCopyToWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/IFormatCopyToWriter.java new file mode 100644 index 00000000000..ae5d37d19e4 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/IFormatCopyToWriter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.copyto; + +import org.apache.tsfile.read.common.block.TsBlock; + +import java.io.IOException; + +public interface IFormatCopyToWriter { + void write(TsBlock tsBlock) throws Exception; + + TsBlock buildResultTsBlock(); + + void seal() throws Exception; + + void close() throws IOException; +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/TableCopyToOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/TableCopyToOperator.java new file mode 100644 index 00000000000..7d2871fb238 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/TableCopyToOperator.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.copyto; + +import org.apache.iotdb.commons.schema.column.ColumnHeader; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.tsfile.CopyToTsFileOptions; +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.tsfile.TsFileFormatCopyToWriter; +import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class TableCopyToOperator implements ProcessOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TableCopyToOperator.class); + + private final OperatorContext operatorContext; + private final Operator childOperator; + private final String targetFilePath; + private final CopyToOptions options; + private final List<ColumnHeader> innerQueryColumnHeaders; + private final List<Integer> columnIndex2TsBlockColumnIndexList; + + private IFormatCopyToWriter writer; + private File targetFile; + private boolean isFinished = false; + private boolean hasData = false; + + public TableCopyToOperator( + OperatorContext operatorContext, + Operator child, + String targetFilePath, + CopyToOptions options, + List<ColumnHeader> innerQueryColumnHeaders, + List<Integer> columnIndex2TsBlockColumnIndexList) { + this.operatorContext = operatorContext; + this.childOperator = child; + this.targetFilePath = targetFilePath; + this.options = options; + this.innerQueryColumnHeaders = innerQueryColumnHeaders; + this.columnIndex2TsBlockColumnIndexList = columnIndex2TsBlockColumnIndexList; + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public TsBlock next() throws Exception { + long startTime = System.nanoTime(); + long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + IFormatCopyToWriter formatWriter = getWriter(); + do { + if (!childOperator.hasNext()) { + isFinished = true; + break; + } + TsBlock tsBlock = childOperator.next(); + if (tsBlock == null || tsBlock.isEmpty()) { + continue; + } + hasData = true; + formatWriter.write(tsBlock); + } while (System.nanoTime() - startTime < maxRuntime && !isFinished); + + if (isFinished) { + formatWriter.seal(); + return formatWriter.buildResultTsBlock(); + } + return null; + } + + private IFormatCopyToWriter getWriter() throws Exception { + if (writer != null) { + return writer; + } + this.targetFile = createTargetFile(targetFilePath); + switch (options.getFormat()) { + case TSFILE: + default: + this.writer = + new TsFileFormatCopyToWriter( + this.targetFile, + (CopyToTsFileOptions) options, + innerQueryColumnHeaders, + columnIndex2TsBlockColumnIndexList); + } + return writer; + } + + private File createTargetFile(String path) throws Exception { + File file = new File(path); + if (file.getParent() == null) { + String dir = TierManager.getInstance().getNextFolderForCopyToTargetFile(); + file = new File(dir, path); + } + File parent = file.getParentFile(); + if (parent != null && !parent.exists() && !parent.mkdirs()) { + throw new IOException("Failed to create directories: " + parent); + } + if (!file.createNewFile()) { + throw new IOException("Target file already exists: " + file.getAbsolutePath()); + } + + return file; + } + + @Override + public boolean hasNext() throws Exception { + return !isFinished; + } + + @Override + public ListenableFuture<?> isBlocked() { + return childOperator.isBlocked(); + } + + @Override + public void close() throws Exception { + childOperator.close(); + if (writer != null) { + writer.close(); + writer = null; + } + if (targetFile == null || (hasData && isFinished)) { + return; + } + Files.deleteIfExists(targetFile.toPath()); + } + + @Override + public boolean isFinished() throws Exception { + return isFinished; + } + + @Override + public long calculateMaxPeekMemory() { + return Math.max( + childOperator.calculateMaxPeekMemory(), + calculateMaxReturnSize() + calculateRetainedSizeAfterCallingNext()); + } + + @Override + public long calculateMaxReturnSize() { + return TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return childOperator.calculateRetainedSizeAfterCallingNext(); + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + RamUsageEstimator.sizeOf(targetFilePath) + + RamUsageEstimator.sizeOfObject(innerQueryColumnHeaders) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) + + MemoryEstimationHelper.getEstimatedSizeOfIntegerArrayList( + columnIndex2TsBlockColumnIndexList) + + options.estimatedMaxRamBytesInWrite() + + childOperator.ramBytesUsed(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/tsfile/CopyToTsFileOptions.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/tsfile/CopyToTsFileOptions.java new file mode 100644 index 00000000000..40a2b24bcf3 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/tsfile/CopyToTsFileOptions.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.copyto.tsfile; + +import org.apache.iotdb.commons.schema.column.ColumnHeader; +import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.commons.schema.table.TsTable; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema; +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.CopyToOptions; +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; +import org.apache.iotdb.db.queryengine.plan.relational.planner.RelationPlan; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier; +import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; +import org.apache.iotdb.db.utils.constant.SqlConstant; + +import org.apache.tsfile.enums.TSDataType; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class CopyToTsFileOptions implements CopyToOptions { + + private static final String DEFAULT_TABLE_NAME = "default"; + + private String targetTableName; + private String targetTimeColumn; + private Set<String> targetTagColumns; + private final long targetMemoryThreshold; + + private boolean generateNewTimeColumn = false; + + public CopyToTsFileOptions( + String targetTableName, + String targetTimeColumn, + Set<String> targetTagColumns, + long targetMemoryThreshold) { + this.targetTableName = targetTableName; + this.targetTimeColumn = targetTimeColumn; + this.targetTagColumns = targetTagColumns; + this.targetMemoryThreshold = targetMemoryThreshold; + } + + public Format getFormat() { + return Format.TSFILE; + } + + public boolean isGenerateNewTimeColumn() { + return generateNewTimeColumn; + } + + @Override + public void infer( + Analysis analysis, RelationPlan queryRelationPlan, List<ColumnHeader> columnHeaders) { + List<Identifier> tables = queryRelationPlan.getScope().getTables(); + TsTable onlyOneQueriedTable = null; + if (tables != null && tables.size() == 1) { + onlyOneQueriedTable = + DataNodeTableCache.getInstance() + .getTable(analysis.getDatabaseName(), tables.get(0).toString(), false); + } + if (targetTableName == null) { + targetTableName = + onlyOneQueriedTable == null ? DEFAULT_TABLE_NAME : onlyOneQueriedTable.getTableName(); + } + if (onlyOneQueriedTable != null) { + if (targetTimeColumn == null || targetTagColumns == null) { + inferTimeAndTags(onlyOneQueriedTable, columnHeaders); + } + } + if (targetTimeColumn == null) { + generateNewTimeColumn = true; + targetTimeColumn = SqlConstant.TABLE_TIME_COLUMN_NAME; + } + if (targetTagColumns == null) { + targetTagColumns = Collections.emptySet(); + } + } + + private void inferTimeAndTags(TsTable tsTable, List<ColumnHeader> columnHeaders) { + Map<String, ColumnHeader> columnName2ColumnHeaderMapInDataset = new HashMap<>(); + for (ColumnHeader columnHeader : columnHeaders) { + columnName2ColumnHeaderMapInDataset.put(columnHeader.getColumnName(), columnHeader); + } + if (targetTagColumns == null) { + boolean canMatchAllTags = true; + List<TsTableColumnSchema> tagColumnsInTsTable = tsTable.getTagColumnSchemaList(); + for (TsTableColumnSchema tsTableColumnSchema : tagColumnsInTsTable) { + String columnName = tsTableColumnSchema.getColumnName(); + ColumnHeader columnHeaderInDataset = columnName2ColumnHeaderMapInDataset.get(columnName); + if (columnHeaderInDataset == null + || columnHeaderInDataset.getColumnType() != tsTableColumnSchema.getDataType()) { + canMatchAllTags = false; + break; + } + } + if (canMatchAllTags) { + this.targetTagColumns = new LinkedHashSet<>(tagColumnsInTsTable.size()); + for (TsTableColumnSchema tagColumn : tagColumnsInTsTable) { + targetTagColumns.add(tagColumn.getColumnName()); + } + } + } + + if (targetTimeColumn == null) { + String timeColumnInTsTable = tsTable.getTimeColumnName(); + if (timeColumnInTsTable != null) { + ColumnHeader timeColumnHeader = + columnName2ColumnHeaderMapInDataset.get(timeColumnInTsTable); + if (timeColumnHeader != null) { + this.targetTimeColumn = timeColumnHeader.getColumnName(); + } + } + if (targetTimeColumn == null + && columnName2ColumnHeaderMapInDataset.containsKey(SqlConstant.TABLE_TIME_COLUMN_NAME)) { + this.targetTimeColumn = SqlConstant.TABLE_TIME_COLUMN_NAME; + } + } + } + + private void inferTime() {} + + @Override + public void check(List<ColumnHeader> columnHeaders) { + if (generateNewTimeColumn && targetTagColumns.isEmpty()) { + return; + } + Set<String> columns = new HashSet<>(targetTagColumns.size()); + int foundTagColumns = 0; + for (ColumnHeader columnHeader : columnHeaders) { + if (!generateNewTimeColumn + && columnHeader.getColumnName().equals(targetTimeColumn) + && columnHeader.getColumnType() != TSDataType.TIMESTAMP) { + throw new SemanticException("Data type of target time column is not TIMESTAMP"); + } + if (targetTagColumns.contains(columnHeader.getColumnName())) { + if (columnHeader.getColumnType() != TSDataType.STRING) { + throw new SemanticException( + "Data type of tag column " + columnHeader.getColumnName() + " is not STRING"); + } + foundTagColumns++; + } + columns.add(columnHeader.getColumnName()); + } + if (columns.size() != columnHeaders.size()) { + throw new SemanticException("Duplicate column names in query dataset."); + } + if (foundTagColumns != targetTagColumns.size()) { + throw new SemanticException("Some specified tag columns are not exist in query dataset."); + } + if (foundTagColumns + (generateNewTimeColumn ? 0 : 1) == columns.size()) { + throw new SemanticException("Number of field columns should be larger than 0."); + } + } + + public List<ColumnHeader> getRespColumnHeaders() { + return ColumnHeaderConstant.COPY_TO_TSFILE_COLUMN_HEADERS; + } + + @Override + public long estimatedMaxRamBytesInWrite() { + return targetMemoryThreshold; + } + + public String getTargetTableName() { + return targetTableName; + } + + public String getTargetTimeColumn() { + return targetTimeColumn; + } + + public Set<String> getTargetTagColumns() { + return targetTagColumns; + } + + public long getTargetMemoryThreshold() { + return targetMemoryThreshold; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/tsfile/TsFileFormatCopyToWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/tsfile/TsFileFormatCopyToWriter.java new file mode 100644 index 00000000000..073e101ca6a --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/copyto/tsfile/TsFileFormatCopyToWriter.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.copyto.tsfile; + +import org.apache.iotdb.commons.schema.column.ColumnHeader; +import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.IFormatCopyToWriter; + +import org.apache.ratis.util.MemoizedCheckedSupplier; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.ColumnSchema; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.v4.TableTsBlock2TsFileWriter; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class TsFileFormatCopyToWriter implements IFormatCopyToWriter { + private final File targetFile; + private final String targetTableName; + private final String targetTimeColumn; + private final Set<String> targetTagColumns; + private final boolean generateNewTimeColumn; + + private MemoizedCheckedSupplier<TableTsBlock2TsFileWriter, IOException> tsFileWriter; + private long rowCount = 0; + private long deviceCount = 0; + + public TsFileFormatCopyToWriter( + File file, + CopyToTsFileOptions copyToOptions, + List<ColumnHeader> innerQueryDatasetHeader, + List<Integer> columnIndex2TsBlockColumnIndexList) { + this.targetFile = file; + this.targetTableName = copyToOptions.getTargetTableName(); + this.targetTimeColumn = copyToOptions.getTargetTimeColumn(); + this.generateNewTimeColumn = copyToOptions.isGenerateNewTimeColumn(); + targetTagColumns = copyToOptions.getTargetTagColumns(); + + List<ColumnHeader> columnHeadersMatchChildTsBlock = + getColumnHeadersMatchTsBlock(innerQueryDatasetHeader, columnIndex2TsBlockColumnIndexList); + List<ColumnSchema> columnSchemas = + new ArrayList<>(columnHeadersMatchChildTsBlock.size() + (generateNewTimeColumn ? 1 : 0)); + Map<String, Integer> columnNameIndexMapInDatasetHeader = new HashMap<>(); + for (int i = 0; i < columnHeadersMatchChildTsBlock.size(); i++) { + columnNameIndexMapInDatasetHeader.put( + columnHeadersMatchChildTsBlock.get(i).getColumnName(), i); + } + // add time column + columnSchemas.add( + new ColumnSchema(targetTimeColumn, TSDataType.TIMESTAMP, ColumnCategory.TIME)); + int timeColumnIdxInQueryTsBlock = + generateNewTimeColumn ? -1 : columnNameIndexMapInDatasetHeader.get(targetTimeColumn); + + // add tag columns + int[] tagColumnIndexesInTsBlock = new int[targetTagColumns.size()]; + int arrIdx = 0; + for (String targetTagColumn : targetTagColumns) { + int tagIdx = columnNameIndexMapInDatasetHeader.get(targetTagColumn); + ColumnHeader tagColumnHeader = columnHeadersMatchChildTsBlock.get(tagIdx); + columnSchemas.add( + new ColumnSchema( + tagColumnHeader.getColumnName(), + tagColumnHeader.getColumnType(), + ColumnCategory.TAG)); + tagColumnIndexesInTsBlock[arrIdx++] = tagIdx; + } + // add field columns + int fieldColumnCount = + columnHeadersMatchChildTsBlock.size() + - tagColumnIndexesInTsBlock.length + - (generateNewTimeColumn ? 0 : 1); + int[] fieldColumnIndexesInQueryTsBlock = new int[fieldColumnCount]; + IMeasurementSchema[] fieldColumnSchemas = new IMeasurementSchema[fieldColumnCount]; + arrIdx = 0; + for (int i = 0; i < columnHeadersMatchChildTsBlock.size(); i++) { + ColumnHeader columnHeader = columnHeadersMatchChildTsBlock.get(i); + if (targetTagColumns.contains(columnHeader.getColumnName()) + || columnHeader.getColumnName().equals(targetTimeColumn)) { + continue; + } + columnSchemas.add( + new ColumnSchema( + columnHeader.getColumnName(), columnHeader.getColumnType(), ColumnCategory.FIELD)); + fieldColumnSchemas[arrIdx] = + new MeasurementSchema(columnHeader.getColumnName(), columnHeader.getColumnType()); + fieldColumnIndexesInQueryTsBlock[arrIdx] = i; + arrIdx++; + } + + TableSchema tableSchema = new TableSchema(targetTableName, columnSchemas); + this.tsFileWriter = + MemoizedCheckedSupplier.valueOf( + () -> + new TableTsBlock2TsFileWriter( + targetFile, + tableSchema, + copyToOptions.getTargetMemoryThreshold(), + generateNewTimeColumn, + timeColumnIdxInQueryTsBlock, + tagColumnIndexesInTsBlock, + fieldColumnIndexesInQueryTsBlock, + fieldColumnSchemas)); + } + + private List<ColumnHeader> getColumnHeadersMatchTsBlock( + List<ColumnHeader> queryOutputColumnHeaders, + List<Integer> columnIndex2TsBlockColumnIndexList) { + List<ColumnHeader> columnHeadersMatchTsBlock = new ArrayList<>(queryOutputColumnHeaders.size()); + for (int tsBlockColumnIndex : columnIndex2TsBlockColumnIndexList) { + columnHeadersMatchTsBlock.add(queryOutputColumnHeaders.get(tsBlockColumnIndex)); + } + return columnHeadersMatchTsBlock; + } + + @Override + public void write(TsBlock tsBlock) throws Exception { + tsFileWriter.get().write(tsBlock); + } + + @Override + public void seal() throws Exception { + if (!tsFileWriter.isInitialized()) { + return; + } + TableTsBlock2TsFileWriter writer = tsFileWriter.get(); + writer.close(); + // should call these methods after writer.close() + deviceCount = writer.getDeviceCount(); + rowCount = writer.getRowCount(); + tsFileWriter = null; + } + + @Override + public TsBlock buildResultTsBlock() { + TsBlockBuilder builder = + TsBlockBuilder.withMaxTsBlockSize( + 1024, + ColumnHeaderConstant.COPY_TO_TSFILE_COLUMN_HEADERS.stream() + .map(ColumnHeader::getColumnType) + .collect(Collectors.toList())); + builder.getTimeColumnBuilder().writeLong(0); + builder.getValueColumnBuilders()[0].writeBinary( + new Binary(rowCount > 0 ? targetFile.getAbsolutePath() : "", TSFileConfig.STRING_CHARSET)); + builder.getValueColumnBuilders()[1].writeLong(rowCount); + builder.getValueColumnBuilders()[2].writeLong(deviceCount); + builder.getValueColumnBuilders()[3].writeLong(targetFile.length()); + builder.getValueColumnBuilders()[4].writeBinary( + new Binary(targetTableName, TSFileConfig.STRING_CHARSET)); + builder.getValueColumnBuilders()[5].writeBinary( + new Binary( + targetTimeColumn + (generateNewTimeColumn ? "(auto_gen)" : ""), + TSFileConfig.STRING_CHARSET)); + builder.getValueColumnBuilders()[6].writeBinary( + new Binary(targetTagColumns.toString(), TSFileConfig.STRING_CHARSET)); + builder.declarePosition(); + return builder.build(); + } + + @Override + public void close() throws IOException { + if (tsFileWriter == null || !tsFileWriter.isInitialized()) { + return; + } + tsFileWriter.get().close(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index dc4ff80dcfc..4a329200847 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -31,6 +31,7 @@ import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; +import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator; import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.TableDateBinTimeRangeIterator; import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.TableSingleTimeWindowIterator; @@ -66,6 +67,7 @@ import org.apache.iotdb.db.queryengine.execution.operator.process.TableSortOpera import org.apache.iotdb.db.queryengine.execution.operator.process.TableStreamSortOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.TableTopKOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.ValuesOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.TableCopyToOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.fill.IFill; import org.apache.iotdb.db.queryengine.execution.operator.process.fill.ILinearFill; import org.apache.iotdb.db.queryengine.execution.operator.process.fill.constant.BinaryConstantFill; @@ -190,6 +192,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationT import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CopyToNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode; @@ -3473,6 +3476,31 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution operatorContext, operator, node.getQueryId(), node.isVerbose(), node.getTimeout()); } + @Override + public Operator visitCopyTo(CopyToNode node, LocalExecutionPlanContext context) { + PlanNode childNode = node.getChild(); + + DatasetHeader datasetHeader = node.getInnerQueryDatasetHeader(); + datasetHeader.setTableColumnToTsBlockIndexMap(node.getInnerQueryOutputNode(), childNode); + + Operator operator = childNode.accept(this, context); + + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + TableCopyToOperator.class.getSimpleName()); + return new TableCopyToOperator( + operatorContext, + operator, + node.getTargetFilePath(), + node.getCopyToOptions(), + datasetHeader.getColumnHeaders(), + datasetHeader.getColumnIndex2TsBlockColumnIndexList()); + } + @Override public Operator visitTableFunctionProcessor( TableFunctionProcessorNode node, LocalExecutionPlanContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 16ceb28be79..1bc3c961109 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -751,6 +751,11 @@ public abstract class PlanVisitor<R, C> { return visitSingleChildProcess(node, context); } + public R visitCopyTo( + org.apache.iotdb.db.queryengine.plan.relational.planner.node.CopyToNode node, C context) { + return visitSingleChildProcess(node, context); + } + public R visitOutput( org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode node, C context) { return visitSingleChildProcess(node, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java index 4a0fe9daa57..e4034c9fed2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java @@ -243,6 +243,8 @@ public class Analysis implements IAnalysis { private DatasetHeader respDatasetHeader; + private DatasetHeader copyToChildQueryNodeRespDatasetHeader; + private boolean finishQueryAfterAnalyze; // indicate if value filter exists in query @@ -973,6 +975,15 @@ public class Analysis implements IAnalysis { this.respDatasetHeader = respDatasetHeader; } + public DatasetHeader getCopyToChildNodeRespDatasetHeader() { + return this.copyToChildQueryNodeRespDatasetHeader; + } + + public void setCopyToChildQueryNodeRespDatasetHeader( + DatasetHeader copyToChildQueryNodeRespDatasetHeader) { + this.copyToChildQueryNodeRespDatasetHeader = copyToChildQueryNodeRespDatasetHeader; + } + @Override public String getStatementType() { return null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java index bc6d54c37e7..5cff579e836 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java @@ -68,6 +68,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Cast; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CoalesceExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Columns; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CopyTo; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CountDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateIndex; @@ -850,6 +851,13 @@ public class StatementAnalyzer { return visitQuery((Query) node.getStatement(), context); } + @Override + protected Scope visitCopyTo(CopyTo node, Optional<Scope> context) { + Scope innerQueryScope = visitQuery((Query) node.getQueryStatement(), context); + analysis.setScope(node, innerQueryScope); + return innerQueryScope; + } + @Override protected Scope visitExplainAnalyze(ExplainAnalyze node, Optional<Scope> context) { queryContext.setExplainType(ExplainType.EXPLAIN_ANALYZE); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java index 2d7ed174d2a..1cdee76d8db 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java @@ -51,6 +51,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectN import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl; import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema; import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.PredicateWithUncorrelatedScalarSubqueryReconstructor; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CopyToNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExplainAnalyzeNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode; @@ -67,6 +68,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.Log import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanOptimizer; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AbstractQueryDeviceWithCache; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AbstractTraverseDevice; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CopyTo; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CountDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateOrUpdateDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete; @@ -254,6 +256,9 @@ public class TableLogicalPlanner { if (statement instanceof ExplainAnalyze) { return planExplainAnalyze((ExplainAnalyze) statement, analysis); } + if (statement instanceof CopyTo) { + return createRelationPlan((CopyTo) statement, analysis); + } if (statement instanceof Insert) { return genInsertPlan(analysis, (Insert) statement); } @@ -330,7 +335,14 @@ public class TableLogicalPlanner { int columnNumber = 0; // TODO perfect the logic of outputDescriptor - if (queryContext.isExplainAnalyze() && !queryContext.isInnerTriggeredQuery()) { + if (plan.getRoot() instanceof CopyToNode) { + for (ColumnHeader columnHeader : + ((CopyToNode) plan.getRoot()).getCopyToOptions().getRespColumnHeaders()) { + outputs.add(new Symbol(columnHeader.getColumnName())); + names.add(columnHeader.getColumnName()); + columnHeaders.add(columnHeader); + } + } else if (queryContext.isExplainAnalyze() && !queryContext.isInnerTriggeredQuery()) { outputs.add(new Symbol(ColumnHeaderConstant.EXPLAIN_ANALYZE)); names.add(ColumnHeaderConstant.EXPLAIN_ANALYZE); columnHeaders.add(new ColumnHeader(ColumnHeaderConstant.EXPLAIN_ANALYZE, TSDataType.TEXT)); @@ -629,6 +641,48 @@ public class TableLogicalPlanner { Optional.empty()); } + private RelationPlan createRelationPlan(final CopyTo statement, final Analysis analysis) { + Statement innerQueryStatement = statement.getQueryStatement(); + RelationPlan innerQueryRelationPlan = planStatementWithoutOutput(analysis, innerQueryStatement); + + OutputNode outputNode = (OutputNode) createOutputPlan(innerQueryRelationPlan, analysis); + DatasetHeader innerQueryRespDatasetHeader = analysis.getRespDatasetHeader(); + + statement + .getOptions() + .infer(analysis, innerQueryRelationPlan, innerQueryRespDatasetHeader.getColumnHeaders()); + statement.getOptions().check(innerQueryRespDatasetHeader.getColumnHeaders()); + + PlanNode newRoot = + new CopyToNode( + queryContext.getQueryId().genPlanNodeId(), + innerQueryRelationPlan.getRoot(), + statement.getTargetFileName(), + statement.getOptions(), + // recording permittedOutputs of CopyToNode's child + getChildPermittedOutputs( + analysis, statement.getQueryStatement(), innerQueryRelationPlan), + innerQueryRespDatasetHeader, + outputNode); + return new RelationPlan( + newRoot, + innerQueryRelationPlan.getScope(), + innerQueryRelationPlan.getFieldMappings(), + Optional.empty()); + } + + private List<Symbol> getChildPermittedOutputs( + Analysis analysis, Statement innerQueryStatement, RelationPlan innerQueryPlan) { + RelationType outputDescriptor = analysis.getOutputDescriptor(innerQueryStatement); + ImmutableList.Builder<Symbol> childPermittedOutputs = ImmutableList.builder(); + for (Field field : outputDescriptor.getVisibleFields()) { + int fieldIndex = outputDescriptor.indexOf(field); + Symbol columnSymbol = innerQueryPlan.getSymbol(fieldIndex); + childPermittedOutputs.add(columnSymbol); + } + return childPermittedOutputs.build(); + } + private enum Stage { CREATED, OPTIMIZED, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 529bc9ffc57..79ea52597bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -64,6 +64,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationT import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggregationTreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CopyToNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExplainAnalyzeNode; @@ -217,6 +218,13 @@ public class TableDistributedPlanGenerator return Collections.singletonList(node); } + @Override + public List<PlanNode> visitCopyTo(CopyToNode node, PlanContext context) { + final List<PlanNode> children = genResult(node.getChild(), context); + node.setChild(children.get(0)); + return Collections.singletonList(node); + } + @Override public List<PlanNode> visitOutput(final OutputNode node, final PlanContext context) { final List<PlanNode> childrenNodes = node.getChild().accept(this, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java index 7a64ada9ced..8e8a688811f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CopyToNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; @@ -101,6 +102,12 @@ public class TableModelTypeProviderExtractor { } } + @Override + public Void visitCopyTo(CopyToNode node, Void context) { + visitPlan(node.getChild(), context); + return null; + } + @Override public Void visitInto(IntoNode node, Void context) { node.getChild().accept(this, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneCopyToColumns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneCopyToColumns.java new file mode 100644 index 00000000000..d6f4cfb991e --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneCopyToColumns.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CopyToNode; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; + +import com.google.common.collect.ImmutableSet; + +import static org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.restrictChildOutputs; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.copyTo; + +public class PruneCopyToColumns implements Rule<CopyToNode> { + private static final Pattern<CopyToNode> PATTERN = copyTo(); + + @Override + public Pattern<CopyToNode> getPattern() { + return PATTERN; + } + + @Override + public Result apply(CopyToNode node, Captures captures, Context context) { + return restrictChildOutputs( + context.getIdAllocator(), node, ImmutableSet.copyOf(node.getChildPermittedOutputs())) + .map(Result::ofPlanNode) + .orElse(Result.empty()); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CopyToNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CopyToNode.java new file mode 100644 index 00000000000..f080b39eee0 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CopyToNode.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.commons.schema.column.ColumnHeader; +import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.CopyToOptions; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.stream.Collectors; + +public class CopyToNode extends SingleChildProcessNode { + + private final String targetFilePath; + private final CopyToOptions copyToOptions; + private final List<Symbol> childPermittedOutputs; + private final OutputNode innerQueryOutputNode; + private final DatasetHeader innerQueryDatasetHeader; + + public CopyToNode( + PlanNodeId id, + PlanNode child, + String targetFilePath, + CopyToOptions copyToOptions, + List<Symbol> childPermittedOutputs, + DatasetHeader innerQueryDatasetHeader, + OutputNode innerQueryOutputNode) { + super(id); + this.child = child; + this.targetFilePath = targetFilePath; + this.copyToOptions = copyToOptions; + this.childPermittedOutputs = childPermittedOutputs; + this.innerQueryDatasetHeader = innerQueryDatasetHeader; + this.innerQueryOutputNode = innerQueryOutputNode; + } + + public DatasetHeader getInnerQueryDatasetHeader() { + return innerQueryDatasetHeader; + } + + public OutputNode getInnerQueryOutputNode() { + return innerQueryOutputNode; + } + + public String getTargetFilePath() { + return targetFilePath; + } + + public CopyToOptions getCopyToOptions() { + return copyToOptions; + } + + public List<Symbol> getChildPermittedOutputs() { + return childPermittedOutputs; + } + + @Override + public PlanNode clone() { + return new CopyToNode( + id, + child, + targetFilePath, + copyToOptions, + childPermittedOutputs, + innerQueryDatasetHeader, + innerQueryOutputNode); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitCopyTo(this, context); + } + + @Override + public List<Symbol> getOutputSymbols() { + return ColumnHeaderConstant.COPY_TO_TSFILE_COLUMN_HEADERS.stream() + .map(column -> new Symbol(column.getColumnName())) + .collect(Collectors.toList()); + } + + @Override + public List<String> getOutputColumnNames() { + return ColumnHeaderConstant.COPY_TO_TSFILE_COLUMN_HEADERS.stream() + .map(ColumnHeader::getColumnName) + .collect(Collectors.toList()); + } + + @Override + public PlanNode replaceChildren(List<PlanNode> newChildren) { + return new CopyToNode( + id, + newChildren.get(0), + targetFilePath, + copyToOptions, + childPermittedOutputs, + innerQueryDatasetHeader, + innerQueryOutputNode); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + throw new UnsupportedOperationException("CopyToNode should not be serialized"); + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + throw new UnsupportedOperationException("CopyToNode should not be serialized"); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java index c9bf429b183..d661862a743 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java @@ -137,6 +137,10 @@ public final class Patterns { return typeOf(ExplainAnalyzeNode.class); } + public static Pattern<CopyToNode> copyTo() { + return typeOf(CopyToNode.class); + } + public static Pattern<ProjectNode> project() { return typeOf(ProjectNode.class); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java index 864b9a987ab..afe68803948 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java @@ -50,6 +50,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Pr import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneApplyCorrelation; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneApplySourceColumns; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneAssignUniqueIdColumns; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneCopyToColumns; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneCorrelatedJoinColumns; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneCorrelatedJoinCorrelation; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneDistinctAggregation; @@ -147,6 +148,7 @@ public class LogicalOptimizeFactory { new PruneOffsetColumns(), new PruneOutputSourceColumns(), new PruneExplainAnalyzeColumns(), + new PruneCopyToColumns(), new PruneProjectColumns(), new PruneSortColumns(), new PruneTableFunctionProcessorColumns(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java index b7f2e9eaaa8..f68daef681b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java @@ -33,6 +33,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.DeterminismEva import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CopyToNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CorrelatedJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; @@ -388,6 +389,24 @@ public class UnaliasSymbolReferences implements PlanOptimizer { mapping); } + @Override + public PlanAndMappings visitCopyTo(CopyToNode node, UnaliasContext context) { + PlanAndMappings rewrittenSource = node.getChild().accept(this, context); + Map<Symbol, Symbol> mapping = new HashMap<>(rewrittenSource.getMappings()); + SymbolMapper mapper = symbolMapper(mapping); + List<Symbol> newChildPermittedOutputs = mapper.map(node.getChildPermittedOutputs()); + return new PlanAndMappings( + new CopyToNode( + node.getPlanNodeId(), + rewrittenSource.getRoot(), + node.getTargetFilePath(), + node.getCopyToOptions(), + newChildPermittedOutputs, + node.getInnerQueryDatasetHeader(), + node.getInnerQueryOutputNode()), + mapping); + } + @Override public PlanAndMappings visitMarkDistinct(MarkDistinctNode node, UnaliasContext context) { PlanAndMappings rewrittenSource = node.getChild().accept(this, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java index 13925ae3920..0d06b7793c1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java @@ -89,6 +89,10 @@ public abstract class AstVisitor<R, C> { return visitStatement(node, context); } + protected R visitCopyTo(CopyTo node, C context) { + return visitStatement(node, context); + } + protected R visitExplainAnalyze(ExplainAnalyze node, C context) { return visitStatement(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CopyTo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CopyTo.java new file mode 100644 index 00000000000..97682024030 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CopyTo.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.CopyToOptions; + +import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.RamUsageEstimator; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; + +public class CopyTo extends Statement { + + private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(CopyTo.class); + private final Statement queryStatement; + private final String targetFileName; + private final CopyToOptions options; + + public CopyTo(Statement queryStatement, String targetFileName, CopyToOptions options) { + this(null, queryStatement, targetFileName, options); + } + + public CopyTo( + @Nullable NodeLocation location, + Statement queryStatement, + String targetFileName, + CopyToOptions options) { + super(location); + this.queryStatement = queryStatement; + this.targetFileName = targetFileName; + this.options = options; + } + + public Statement getQueryStatement() { + return queryStatement; + } + + public String getTargetFileName() { + return targetFileName; + } + + public CopyToOptions getOptions() { + return options; + } + + @Override + public <R, C> R accept(AstVisitor<R, C> visitor, C context) { + return visitor.visitCopyTo(this, context); + } + + @Override + public List<Node> getChildren() { + return ImmutableList.<Node>builder().add(queryStatement).build(); + } + + @Override + public int hashCode() { + return Objects.hash(queryStatement); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if ((obj == null) || (getClass() != obj.getClass())) { + return false; + } + CopyTo o = (CopyTo) obj; + return Objects.equals(queryStatement, o.queryStatement); + } + + @Override + public String toString() { + return toStringHelper(this).add("statement", queryStatement).toString(); + } + + @Override + public long ramBytesUsed() { + long size = INSTANCE_SIZE; + size += AstMemoryEstimationHelper.getEstimatedSizeOfNodeLocation(getLocationInternal()); + size += AstMemoryEstimationHelper.getEstimatedSizeOfAccountableObject(queryStatement); + return size; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DefaultTraversalVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DefaultTraversalVisitor.java index f22af4b020e..b23284ec477 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DefaultTraversalVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DefaultTraversalVisitor.java @@ -86,6 +86,12 @@ public abstract class DefaultTraversalVisitor<C> extends AstVisitor<Void, C> { return null; } + @Override + protected Void visitCopyTo(CopyTo node, C context) { + process(node.getQueryStatement(), context); + return null; + } + @Override protected Void visitExplainAnalyze(ExplainAnalyze node, C context) { process(node.getStatement(), context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index 1721ad9b80d..ec3ebdca9d8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -35,6 +35,7 @@ import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.CopyToOptions; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AddColumn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AliasedRelation; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AllColumns; @@ -55,6 +56,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CoalesceExpressio import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ColumnDefinition; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Columns; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CopyTo; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CountDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CountStatement; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDB; @@ -248,6 +250,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WithQuery; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ZeroOrMoreQuantifier; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ZeroOrOneQuantifier; import org.apache.iotdb.db.queryengine.plan.relational.sql.util.AstUtil; +import org.apache.iotdb.db.queryengine.plan.relational.sql.util.QueryUtil; import org.apache.iotdb.db.queryengine.plan.relational.type.AuthorRType; import org.apache.iotdb.db.queryengine.plan.statement.StatementType; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; @@ -290,6 +293,7 @@ import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -332,6 +336,8 @@ import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SkipTo.ski import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SkipTo.skipToFirst; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SkipTo.skipToLast; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SkipTo.skipToNextRow; +import static org.apache.iotdb.db.queryengine.plan.relational.sql.util.QueryUtil.selectList; +import static org.apache.iotdb.db.queryengine.plan.relational.sql.util.QueryUtil.table; import static org.apache.iotdb.db.utils.TimestampPrecisionUtils.currPrecision; import static org.apache.iotdb.db.utils.constant.SqlConstant.APPROX_COUNT_DISTINCT; import static org.apache.iotdb.db.utils.constant.SqlConstant.APPROX_MOST_FREQUENT; @@ -2155,6 +2161,62 @@ public class AstBuilder extends RelationalSqlBaseVisitor<Node> { } } + @Override + public Node visitCopyToStatement(RelationalSqlParser.CopyToStatementContext ctx) { + RelationalSqlParser.CopyToStatementOptionsContext optionsContext = ctx.copyToStatementOptions(); + String targetFileName = parseStringLiteral(ctx.fileName.getText()); + CopyToOptions.Builder copyToOptionsBuilder = new CopyToOptions.Builder(); + if (optionsContext != null) { + for (RelationalSqlParser.CopyToStatementOptionContext context : + optionsContext.copyToStatementOption()) { + addCopyToOption(copyToOptionsBuilder, context); + } + } + Statement queryNode = null; + if (ctx.tableName != null) { + QualifiedName qualifiedName = getQualifiedName(ctx.tableName); + if (ctx.tableColumns != null) { + List<RelationalSqlParser.IdentifierContext> identifierList = + ctx.identifierList().identifier(); + SelectItem[] selectItems = new SelectItem[identifierList.size()]; + for (int i = 0; i < identifierList.size(); i++) { + Identifier identifier = (Identifier) visit(identifierList.get(i)); + selectItems[i] = new SingleColumn(identifier, identifier); + } + queryNode = QueryUtil.simpleQuery(selectList(selectItems), table(qualifiedName)); + } else { + queryNode = QueryUtil.simpleQuery(selectList(new AllColumns()), table(qualifiedName)); + } + } else { + queryNode = (Statement) visit(ctx.query()); + } + return new CopyTo(queryNode, targetFileName, copyToOptionsBuilder.build()); + } + + private void addCopyToOption( + CopyToOptions.Builder builder, RelationalSqlParser.CopyToStatementOptionContext context) { + if (context.FORMAT() != null) { + Identifier formatIdentifier = (Identifier) visit(context.identifier()); + builder.withFormat(CopyToOptions.Format.valueOf(formatIdentifier.getValue().toUpperCase())); + } else if (context.TABLE() != null) { + Identifier targetTableIdentifier = (Identifier) visit(context.identifier()); + builder.withTargetTableName(targetTableIdentifier.getValue()); + } else if (context.TIME() != null) { + Identifier targetTimeColumnIdentifier = (Identifier) visit(context.identifier()); + builder.withTargetTimeColumn(targetTimeColumnIdentifier.getValue()); + } else if (context.TAGS() != null) { + List<RelationalSqlParser.IdentifierContext> identifierList = + context.identifierList().identifier(); + Set<String> targetTagColumns = new LinkedHashSet<>(identifierList.size()); + for (RelationalSqlParser.IdentifierContext identifierContext : identifierList) { + targetTagColumns.add(((Identifier) visit(identifierContext)).getValue()); + } + builder.withTargetTagColumns(targetTagColumns); + } else if (context.MEMORY_THRESHOLD() != null) { + builder.withMemoryThreshold(parseLong(context.INTEGER_VALUE().getText())); + } + } + // ********************** query expressions ******************** @Override public Node visitQuery(RelationalSqlParser.QueryContext ctx) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/SqlFormatter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/SqlFormatter.java index 0c1d862a886..cd3e027c442 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/SqlFormatter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/SqlFormatter.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterPipe; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ColumnDefinition; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CopyTo; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateFunction; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreatePipe; @@ -643,6 +644,19 @@ public final class SqlFormatter { return null; } + @Override + protected Void visitCopyTo(CopyTo node, Integer context) { + builder.append("COPY\n"); + builder.append("(\n"); + process(node.getQueryStatement(), context); + builder.append("\n) "); + builder.append("TO "); + builder.append('\''); + builder.append(node.getTargetFileName()); + builder.append('\''); + return null; + } + @Override protected Void visitExplainAnalyze(ExplainAnalyze node, Integer indent) { builder.append("EXPLAIN ANALYZE"); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java index 36bbe1cf063..ed9ec898cf2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java @@ -80,6 +80,10 @@ public class TierManager { private List<String> objectDirs; + private List<String> copyToTargetDirs; + + private FolderManager copyToFolderManager; + /** total space of each tier, Long.MAX_VALUE when one tier contains remote storage */ private long[] tierDiskTotalSpace; @@ -161,6 +165,23 @@ public class TierManager { unSeqDir2TierLevel.put(dir, tierLevel); } + if (tierLevel == 0) { + copyToTargetDirs = + Arrays.stream(tierDirs[tierLevel]) + .filter(Objects::nonNull) + .map( + v -> + FSFactoryProducer.getFSFactory() + .getFile(v, IoTDBConstant.COPY_TO_TARGET_FOLDER_NAME) + .getPath()) + .collect(Collectors.toList()); + try { + copyToFolderManager = new FolderManager(copyToTargetDirs, directoryStrategyType); + } catch (DiskSpaceInsufficientException e) { + logger.error("All disks of tier {} are full.", tierLevel, e); + } + } + objectDirs = Arrays.stream(tierDirs[tierLevel]) .filter(Objects::nonNull) @@ -226,6 +247,10 @@ public class TierManager { : unSeqTiers.get(tierLevel).getNextFolder(); } + public String getNextFolderForCopyToTargetFile() throws DiskSpaceInsufficientException { + return copyToFolderManager.getNextFolder(); + } + public String getNextFolderForObjectFile() throws DiskSpaceInsufficientException { return objectTiers.get(0).getNextFolder(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java index a94f472b606..03e9a53db60 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java @@ -252,6 +252,7 @@ public class IoTDBConstant { public static final String SEQUENCE_FOLDER_NAME = "sequence"; public static final String UNSEQUENCE_FOLDER_NAME = "unsequence"; public static final String OBJECT_FOLDER_NAME = "object"; + public static final String COPY_TO_TARGET_FOLDER_NAME = "copy_to"; public static final String FILE_NAME_SEPARATOR = "-"; public static final String CONSENSUS_FOLDER_NAME = "consensus"; public static final String DATA_REGION_FOLDER_NAME = "data_region"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java index 219cd310be1..d0b1579f9d3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java @@ -353,6 +353,11 @@ public class ColumnHeaderConstant { public static final String SIZE_IN_BYTES_TABLE_MODEL = "size_in_bytes"; public static final String TIME_PARTITION_TABLE_MODEL = "time_partition"; + public static final String ROW_COUNT = "row_count"; + public static final String DEVICE_COUNT = "device_count"; + public static final String TIME_COLUMN = "time_column"; + public static final String TAG_COLUMNS = "tag_columns"; + public static final List<ColumnHeader> lastQueryColumnHeaders = ImmutableList.of( new ColumnHeader(TIMESERIES, TSDataType.TEXT), @@ -789,4 +794,14 @@ public class ColumnHeaderConstant { new ColumnHeader(SHOW_CONFIGURATIONS_VALUE, TSDataType.TEXT), new ColumnHeader(SHOW_CONFIGURATIONS_DEFAULT_VALUE, TSDataType.TEXT), new ColumnHeader(SHOW_CONFIGURATIONS_DESCRIPTION, TSDataType.TEXT)); + + public static final List<ColumnHeader> COPY_TO_TSFILE_COLUMN_HEADERS = + ImmutableList.of( + new ColumnHeader(PATH.toLowerCase(), TSDataType.TEXT), + new ColumnHeader(ROW_COUNT, TSDataType.INT64), + new ColumnHeader(DEVICE_COUNT, TSDataType.INT64), + new ColumnHeader(SIZE_IN_BYTES_TABLE_MODEL, TSDataType.INT64), + new ColumnHeader(TABLE_NAME_TABLE_MODEL, TSDataType.STRING), + new ColumnHeader(TIME_COLUMN, TSDataType.STRING), + new ColumnHeader(TAG_COLUMNS, TSDataType.STRING)); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java index 48cd81b39e1..9fbf41441f5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java @@ -159,6 +159,20 @@ public class TsTable { } } + public String getTimeColumnName() { + readWriteLock.readLock().lock(); + try { + for (Map.Entry<String, TsTableColumnSchema> entry : columnSchemaMap.entrySet()) { + if (entry.getValue().getColumnCategory() == TsTableColumnCategory.TIME) { + return entry.getKey(); + } + } + } finally { + readWriteLock.readLock().unlock(); + } + return null; + } + public int getTagColumnOrdinal(final String columnName) { readWriteLock.readLock().lock(); try { diff --git a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 index c8c30ae3d6d..fb24f441203 100644 --- a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 +++ b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 @@ -187,6 +187,9 @@ statement | executeImmediateStatement | deallocateStatement + // Copy Statement + | copyToStatement + // View, Trigger, CQ, Quota are not supported yet ; @@ -908,6 +911,24 @@ deallocateStatement : DEALLOCATE PREPARE statementName=identifier ; +// ---------------------------------------- Copy Statement --------------------------------------------------------- +copyToStatement + : COPY '(' query ')' TO fileName=string ((WITH)? copyToStatementOptions)? + | COPY tableName=qualifiedName ('(' tableColumns=identifierList ')')? TO fileName=string ((WITH)? copyToStatementOptions)? + ; + +copyToStatementOptions + : '(' copyToStatementOption (',' copyToStatementOption)* ')' + ; + +copyToStatementOption + : FORMAT identifier + | TABLE identifier + | TAGS '(' identifierList ')' + | TIME identifier + | MEMORY_THRESHOLD memory=INTEGER_VALUE + ; + // ------------------------------------------- Query Statement --------------------------------------------------------- queryStatement : query #statementDefault @@ -1452,7 +1473,7 @@ nonReserved // IMPORTANT: this rule must only contain tokens. Nested rules are not supported. See SqlParser.exitNonReserved : ABSENT | ADD | ADMIN | AFTER | ALL | ANALYZE | ANY | ARRAY | ASC | AT | ATTRIBUTE | AUDIT | AUTHORIZATION | AVAILABLE | BEGIN | BERNOULLI | BOTH - | CACHE | CALL | CALLED | CASCADE | CATALOG | CATALOGS | CHAR | CHARACTER | CHARSET | CLEAR | CLUSTER | CLUSTERID | COLUMN | COLUMNS | COMMENT | COMMIT | COMMITTED | CONDITION | CONDITIONAL | CONFIGNODES | CONFIGNODE | CONFIGURATION | CONNECTOR | CONSTANT | COPARTITION | COUNT | CURRENT + | CACHE | CALL | CALLED | CASCADE | CATALOG | CATALOGS | CHAR | CHARACTER | CHARSET | CLEAR | CLUSTER | CLUSTERID | COLUMN | COLUMNS | COMMENT | COMMIT | COMMITTED | CONDITION | CONDITIONAL | CONFIGNODES | CONFIGNODE | CONFIGURATION | CONNECTOR | CONSTANT | COPARTITION | COPY | COUNT | CURRENT | DATA | DATABASE | DATABASES | DATANODE | DATANODES | DATASET | DATE | DAY | DEBUG | DECLARE | DEFAULT | DEFINE | DEFINER | DENY | DESC | DESCRIPTOR | DETAILS| DETERMINISTIC | DEVICES | DISTRIBUTED | DO | DOUBLE | ELSEIF | EMPTY | ENCODING | ERROR | EXCLUDING | EXPLAIN | EXTRACTOR | FETCH | FIELD | FILTER | FINAL | FIRST | FLUSH | FOLLOWING | FORCEDLY | FORMAT | FUNCTION | FUNCTIONS @@ -1462,7 +1483,7 @@ nonReserved | JSON | KEEP | KEY | KEYS | KILL | LANGUAGE | LAST | LATERAL | LEADING | LEAVE | LEVEL | LIMIT | LINEAR | LOAD | LOCAL | LOGICAL | LOOP - | MANAGE_ROLE | MANAGE_USER | MAP | MATCH | MATCHED | MATCHES | MATCH_RECOGNIZE | MATERIALIZED | MEASURES | METHOD | MERGE | MICROSECOND | MIGRATE | MILLISECOND | MINUTE | MODEL | MODELS | MODIFY | MONTH + | MANAGE_ROLE | MANAGE_USER | MAP | MATCH | MATCHED | MATCHES | MATCH_RECOGNIZE | MATERIALIZED | MEASURES | MEMORY_THRESHOLD | METHOD | MERGE | MICROSECOND | MIGRATE | MILLISECOND | MINUTE | MODEL | MODELS | MODIFY | MONTH | NANOSECOND | NESTED | NEXT | NFC | NFD | NFKC | NFKD | NO | NODEID | NONE | NULLIF | NULLS | OBJECT | OF | OFFSET | OMIT | ONE | ONLY | OPTION | ORDINALITY | OUTPUT | OVER | OVERFLOW | PARTITION | PARTITIONS | PASSING | PAST | PATH | PATTERN | PER | PERIOD | PERMUTE | PIPE | PIPEPLUGIN | PIPEPLUGINS | PIPES | PLAN | POSITION | PRECEDING | PRECISION | PRIVILEGES | PREVIOUS | PROCESSLIST | PROCESSOR | PROPERTIES | PRUNE @@ -1470,7 +1491,7 @@ nonReserved | RANGE | READ | READONLY | RECONSTRUCT | REFRESH | REGION | REGIONID | REGIONS | REMOVE | RENAME | REPAIR | REPEAT | REPEATABLE | REPLACE | RESET | RESPECT | RESTRICT | RETURN | RETURNING | RETURNS | REVOKE | ROLE | ROLES | ROLLBACK | ROOT | ROW | ROWS | RPR_FIRST | RPR_LAST | RUNNING | SERIESSLOTID | SERVICE | SERVICES | SCALAR | SCHEMA | SCHEMAS | SECOND | SECURITY | SEEK | SERIALIZABLE | SESSION | SET | SETS | SECURITY | SHOW | SINK | SOME | SOURCE | START | STATS | STOP | SUBSCRIPTION | SUBSCRIPTIONS | SUBSET | SUBSTRING | SYSTEM - | TABLES | TABLESAMPLE | TAG | TEXT | TEXT_STRING | TIES | TIME | TIMEPARTITION | TIMER | TIMER_XL | TIMESERIES | TIMESLOTID | TIMESTAMP | TO | TOPIC | TOPICS | TRAILING | TRANSACTION | TRUNCATE | TRY_CAST | TYPE + | TABLES | TABLESAMPLE | TAG | TAGS | TEXT | TEXT_STRING | TIES | TIME | TIMEPARTITION | TIMER | TIMER_XL | TIMESERIES | TIMESLOTID | TIMESTAMP | TO | TOPIC | TOPICS | TRAILING | TRANSACTION | TRUNCATE | TRY_CAST | TYPE | UNBOUNDED | UNCOMMITTED | UNCONDITIONAL | UNIQUE | UNKNOWN | UNMATCHED | UNTIL | UPDATE | URI | URLS | USE | USED | USER | UTF16 | UTF32 | UTF8 | VALIDATE | VALUE | VARIABLES | VARIATION | VERBOSE | VERSION | VIEW | WEEK | WHILE | WINDOW | WITHIN | WITHOUT | WORK | WRAPPER | WRITE @@ -1533,6 +1554,7 @@ CONSTANT: 'CONSTANT'; CONSTRAINT: 'CONSTRAINT'; COUNT: 'COUNT'; COPARTITION: 'COPARTITION'; +COPY: 'COPY'; CREATE: 'CREATE'; CROSS: 'CROSS'; CUBE: 'CUBE'; @@ -1681,6 +1703,7 @@ MATCHES: 'MATCHES'; MATCH_RECOGNIZE: 'MATCH_RECOGNIZE'; MATERIALIZED: 'MATERIALIZED'; MEASURES: 'MEASURES'; +MEMORY_THRESHOLD: 'MEMORY_THRESHOLD'; METHOD: 'METHOD'; MERGE: 'MERGE'; MICROSECOND: 'US'; @@ -1816,6 +1839,7 @@ TABLE: 'TABLE'; TABLES: 'TABLES'; TABLESAMPLE: 'TABLESAMPLE'; TAG: 'TAG'; +TAGS: 'TAGS'; TEXT: 'TEXT'; TEXT_STRING: 'STRING'; THEN: 'THEN';
