This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch read_tsfile_table_function in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 194cd3076122a636aecb1e4c34bb81e5bde52745 Author: shuwenwei <[email protected]> AuthorDate: Wed Jun 10 17:59:34 2026 +0800 fix bug --- .../java/org/apache/iotdb/it/env/EnvFactory.java | 2 +- .../recent/IoTDBReadTsFileTableFunctionIT.java | 388 +++++++++++++++++++++ .../fragment/FragmentInstanceContext.java | 79 ++--- .../relational/AbstractAggTableScanOperator.java | 42 +-- .../AbstractDeviceTableScanOperator.java | 78 ----- .../relational/AbstractTableScanOperator.java | 50 ++- .../ExternalTsFileAggTableScanOperator.java | 112 +++--- .../relational/ExternalTsFileSeriesScanUtil.java | 21 +- .../ExternalTsFileTableScanOperator.java | 104 +++--- .../source/relational/TableScanOperator.java | 2 +- .../TreeAlignedDeviceViewScanOperator.java | 2 +- .../planner/DataNodeTableOperatorGenerator.java | 27 +- .../readTsFile/ExternalTsFileQueryDataSource.java | 67 ++++ .../readTsFile/ExternalTsFileQueryResource.java | 278 +++++++++------ .../distribute/TableDistributedPlanGenerator.java | 27 +- 15 files changed, 864 insertions(+), 415 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/EnvFactory.java b/integration-test/src/main/java/org/apache/iotdb/it/env/EnvFactory.java index 2fbc9672837..2855bec5b7b 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/EnvFactory.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/EnvFactory.java @@ -43,7 +43,7 @@ public class EnvFactory { try { Class.forName(Config.JDBC_DRIVER_NAME); logger.info(">>>>>>>{}", System.getProperty("TestEnv")); - EnvType envType = EnvType.getSystemEnvType(); + EnvType envType = EnvType.Remote; switch (envType) { case Simple: case TABLE_SIMPLE: diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBReadTsFileTableFunctionIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBReadTsFileTableFunctionIT.java new file mode 100644 index 00000000000..ed236b901ac --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBReadTsFileTableFunctionIT.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.query.recent; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; + +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.record.Tablet; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.sql.Connection; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.iotdb.db.it.utils.TestUtils.tableAssertTestFail; +import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBReadTsFileTableFunctionIT { + private static final String DATABASE_NAME = "test_read_tsfile"; + + private static File tmpDir; + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(); + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("CREATE DATABASE " + DATABASE_NAME); + } + } + + @Before + public void setUpBeforeTest() throws IOException { + tmpDir = new File(Files.createTempDirectory("read-tsfile").toUri()); + } + + @After + public void tearDownAfterTest() { + deleteTmpDir(); + } + + @AfterClass + public static void tearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testReadSingleTsFile() throws Exception { + File tsFile = new File(tmpDir, "single.tsfile"); + try (TsFileWriter writer = new TsFileWriter(tsFile)) { + generateTable( + writer, "table1", Arrays.asList("tag1", "tag2"), Arrays.asList("s1", "s2"), 1, 2); + generateTable(writer, "table2", Arrays.asList("tag1"), Arrays.asList("s1"), 1, 2); + } + + String[] expectedHeader = new String[] {"time", "tag1", "tag2", "s1", "s2"}; + String[] retArray = + new String[] { + "1970-01-01T00:00:00.001Z,tag1_1,tag2_1,1,1,", + "1970-01-01T00:00:00.002Z,tag1_1,tag2_1,2,2,", + "1970-01-01T00:00:00.001Z,tag1_2,tag2_2,1,1,", + "1970-01-01T00:00:00.002Z,tag1_2,tag2_2,2,2,", + }; + tableResultSetEqualTest( + "SELECT time, tag1, tag2, s1, s2 FROM read_tsfile(PATHS => '" + + toSqlPath(tsFile) + + "', TABLE_NAME => 'table1') ORDER BY tag1, tag2, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testReadTsFileWithNoMatchedDevice() throws Exception { + File tsFile = new File(tmpDir, "empty-device.tsfile"); + try (TsFileWriter writer = new TsFileWriter(tsFile)) { + generateTable( + writer, "table1", Arrays.asList("tag1", "tag2"), Arrays.asList("s1", "s2"), 1, 2); + } + + tableResultSetEqualTest( + "SELECT time, tag1, tag2, s1, s2 FROM read_tsfile(PATHS => '" + + toSqlPath(tsFile) + + "', TABLE_NAME => 'table1') WHERE tag1 = 'not_exists'", + new String[] {"time", "tag1", "tag2", "s1", "s2"}, + new String[] {}, + DATABASE_NAME); + } + + @Test + public void testReadTsFileAggregationWithNoMatchedDevice() throws Exception { + File tsFile = new File(tmpDir, "empty-device-aggregation.tsfile"); + try (TsFileWriter writer = new TsFileWriter(tsFile)) { + generateTable( + writer, "table1", Arrays.asList("tag1", "tag2"), Arrays.asList("s1", "s2"), 1, 2); + } + + tableResultSetEqualTest( + "SELECT count(*) AS count_star, count(s1) AS count_s1, sum(s1) AS sum_s1" + + " FROM read_tsfile(PATHS => '" + + toSqlPath(tsFile) + + "', TABLE_NAME => 'table1') WHERE tag1 = 'not_exists'", + new String[] {"count_star", "count_s1", "sum_s1"}, + new String[] {"0,0,null,"}, + DATABASE_NAME); + } + + @Test + public void testReadMultipleTsFilesWithDeviceFilter() throws Exception { + File tsFile1 = new File(tmpDir, "multi-1.tsfile"); + try (TsFileWriter writer = new TsFileWriter(tsFile1)) { + generateTable( + writer, "table1", Arrays.asList("tag1", "tag2"), Arrays.asList("s1", "s2"), 1, 2); + } + File tsFile2 = new File(tmpDir, "multi-2.tsfile"); + try (TsFileWriter writer = new TsFileWriter(tsFile2)) { + generateTable( + writer, "table1", Arrays.asList("tag1", "tag2"), Arrays.asList("s1", "s2"), 3, 4); + } + + String[] expectedHeader = new String[] {"time", "tag1", "tag2", "s1", "s2"}; + String[] retArray = + new String[] { + "1970-01-01T00:00:00.001Z,tag1_1,tag2_1,1,1,", + "1970-01-01T00:00:00.002Z,tag1_1,tag2_1,2,2,", + "1970-01-01T00:00:00.003Z,tag1_1,tag2_1,3,3,", + "1970-01-01T00:00:00.004Z,tag1_1,tag2_1,4,4,", + }; + tableResultSetEqualTest( + "SELECT time, tag1, tag2, s1, s2 FROM read_tsfile(PATHS => '" + + toSqlPath(tsFile1) + + "," + + toSqlPath(tsFile2) + + "', TABLE_NAME => 'table1') WHERE tag1 = 'tag1_1' ORDER BY time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testReadMultipleTsFilesWithSchemaMerge() throws Exception { + File tsFile1 = new File(tmpDir, "schema-merge-1.tsfile"); + try (TsFileWriter writer = new TsFileWriter(tsFile1)) { + generateTable(writer, "table1", Arrays.asList("tag1"), Arrays.asList("s1"), 1, 2); + } + File tsFile2 = new File(tmpDir, "schema-merge-2.tsfile"); + try (TsFileWriter writer = new TsFileWriter(tsFile2)) { + generateTable(writer, "table1", Arrays.asList("tag1"), Arrays.asList("s1", "s2"), 3, 4); + } + + String[] expectedHeader = new String[] {"time", "tag1", "s1", "s2"}; + String[] retArray = + new String[] { + "1970-01-01T00:00:00.001Z,tag1_1,1,null,", + "1970-01-01T00:00:00.002Z,tag1_1,2,null,", + "1970-01-01T00:00:00.003Z,tag1_1,3,3,", + "1970-01-01T00:00:00.004Z,tag1_1,4,4,", + "1970-01-01T00:00:00.001Z,tag1_2,1,null,", + "1970-01-01T00:00:00.002Z,tag1_2,2,null,", + "1970-01-01T00:00:00.003Z,tag1_2,3,3,", + "1970-01-01T00:00:00.004Z,tag1_2,4,4,", + }; + tableResultSetEqualTest( + "SELECT time, tag1, s1, s2 FROM read_tsfile(PATHS => '" + + toSqlPath(tsFile1) + + "," + + toSqlPath(tsFile2) + + "', TABLE_NAME => 'table1') ORDER BY tag1, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testReadMultipleTsFilesWithConflictingFieldType() throws Exception { + File tsFile1 = new File(tmpDir, "conflict-1.tsfile"); + try (TsFileWriter writer = new TsFileWriter(tsFile1)) { + generateTable(writer, "table1", Arrays.asList("tag1"), Arrays.asList("s1"), 1, 2); + } + File tsFile2 = new File(tmpDir, "conflict-2.tsfile"); + try (TsFileWriter writer = new TsFileWriter(tsFile2)) { + generateTable( + writer, "table1", Arrays.asList("tag1"), Arrays.asList("s1"), TSDataType.DOUBLE, 3, 4); + } + + tableAssertTestFail( + "SELECT * FROM read_tsfile(PATHS => '" + + toSqlPath(tsFile1) + + "," + + toSqlPath(tsFile2) + + "', TABLE_NAME => 'table1')", + "has conflicting data types when merging table schema", + DATABASE_NAME); + } + + @Test + public void testReadMultipleTsFilesWithConflictingTagColumns() throws Exception { + File tsFile1 = new File(tmpDir, "tag-conflict-1.tsfile"); + try (TsFileWriter writer = new TsFileWriter(tsFile1)) { + generateTable(writer, "table1", Arrays.asList("tag1"), Arrays.asList("s1"), 1, 2); + } + File tsFile2 = new File(tmpDir, "tag-conflict-2.tsfile"); + try (TsFileWriter writer = new TsFileWriter(tsFile2)) { + generateTable(writer, "table1", Arrays.asList("tag2"), Arrays.asList("s1"), 3, 4); + } + + tableAssertTestFail( + "SELECT * FROM read_tsfile(PATHS => '" + + toSqlPath(tsFile1) + + "," + + toSqlPath(tsFile2) + + "', TABLE_NAME => 'table1')", + "Tag columns conflict when merging table schema", + DATABASE_NAME); + } + + @Test + public void testReadTsFileWithoutTableNameWhenMultipleTablesExist() throws Exception { + File tsFile = new File(tmpDir, "multiple-tables.tsfile"); + try (TsFileWriter writer = new TsFileWriter(tsFile)) { + generateTable(writer, "table1", Arrays.asList("tag1"), Arrays.asList("s1"), 1, 2); + generateTable(writer, "table2", Arrays.asList("tag1"), Arrays.asList("s1"), 1, 2); + } + + tableAssertTestFail( + "SELECT * FROM read_tsfile(PATHS => '" + toSqlPath(tsFile) + "')", + "Cannot infer table name from TsFile because multiple tables are found", + DATABASE_NAME); + } + + @Test + public void testReadTsFileWithInvalidPaths() throws IOException { + File missingFile = new File(tmpDir, "missing.tsfile"); + tableAssertTestFail( + "SELECT * FROM read_tsfile(PATHS => '" + toSqlPath(missingFile) + "')", + "TsFile path does not exist", + DATABASE_NAME); + + DataNodeWrapper dataNodeWrapper = EnvFactory.getEnv().getDataNodeWrapper(0); + File dataDir = + new File(dataNodeWrapper.getDataNodeDir() + File.separator + "data", "forbidden.tsfile"); + Files.createDirectories(dataDir.getParentFile().toPath()); + Files.write(dataDir.toPath(), new byte[0]); + tableAssertTestFail( + "SELECT * FROM read_tsfile(PATHS => '" + toSqlPath(dataDir) + "')", + "is not allowed because it may access IoTDB data directory", + DATABASE_NAME); + } + + private static void generateTable( + TsFileWriter writer, + String tableName, + List<String> tagColumns, + List<String> fieldColumns, + long startTime, + long endTime) + throws IOException, WriteProcessException { + generateTable( + writer, tableName, tagColumns, fieldColumns, TSDataType.INT64, startTime, endTime); + } + + private static void generateTable( + TsFileWriter writer, + String tableName, + List<String> tagColumns, + List<String> fieldColumns, + TSDataType fieldType, + long startTime, + long endTime) + throws IOException, WriteProcessException { + List<String> columnNames = new ArrayList<>(tagColumns.size() + fieldColumns.size()); + List<TSDataType> columnTypes = new ArrayList<>(tagColumns.size() + fieldColumns.size()); + List<ColumnCategory> columnCategories = + new ArrayList<>(tagColumns.size() + fieldColumns.size()); + for (String tagColumn : tagColumns) { + columnNames.add(tagColumn); + columnTypes.add(TSDataType.STRING); + columnCategories.add(ColumnCategory.TAG); + } + for (String fieldColumn : fieldColumns) { + columnNames.add(fieldColumn); + columnTypes.add(fieldType); + columnCategories.add(ColumnCategory.FIELD); + } + + writer.registerTableSchema( + new TableSchema(tableName, columnNames, columnTypes, columnCategories)); + Tablet tablet = new Tablet(tableName, columnNames, columnTypes, columnCategories); + for (int deviceIndex = 1; deviceIndex <= 2; deviceIndex++) { + for (long time = startTime; time <= endTime; time++) { + int row = tablet.getRowSize(); + tablet.addTimestamp(row, time); + for (int i = 0; i < tagColumns.size(); i++) { + tablet.addValue(row, i, tagColumns.get(i) + "_" + deviceIndex); + } + for (int i = 0; i < fieldColumns.size(); i++) { + addFieldValue(tablet, row, tagColumns.size() + i, fieldType, time); + } + if (tablet.getRowSize() == tablet.getMaxRowNumber()) { + writer.writeTable(tablet); + tablet.reset(); + } + } + } + if (tablet.getRowSize() != 0) { + writer.writeTable(tablet); + } + } + + private static void addFieldValue( + Tablet tablet, int row, int column, TSDataType fieldType, long time) { + if (fieldType == TSDataType.DOUBLE) { + tablet.addValue(row, column, (double) time); + return; + } + tablet.addValue(row, column, time); + } + + private static String toSqlPath(File file) { + return file.getAbsolutePath().replace("\\", "\\\\").replace("'", "''"); + } + + private static void clearTmpDir() { + if (tmpDir == null || !tmpDir.exists()) { + return; + } + File[] files = tmpDir.listFiles(); + if (files != null) { + for (File file : files) { + try { + Files.delete(file.toPath()); + } catch (IOException ignored) { + // ignore + } + } + } + } + + private static void deleteTmpDir() { + clearTmpDir(); + if (tmpDir == null || !tmpDir.exists()) { + return; + } + try { + Files.delete(tmpDir.toPath()); + } catch (IOException ignored) { + // ignore + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 8ae3e885f5e..24c28e668b1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -46,6 +46,8 @@ import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet; import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet; import org.apache.iotdb.db.queryengine.plan.planner.memory.ThreadSafeMemoryReservationManager; import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate; +import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryDataSource; +import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery; @@ -56,8 +58,6 @@ import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegio import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceType; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex; import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp; @@ -71,8 +71,6 @@ import org.apache.tsfile.utils.RamUsageEstimator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; import java.time.ZoneId; import java.time.format.DateTimeParseException; import java.util.ArrayList; @@ -121,8 +119,7 @@ public class FragmentInstanceContext extends QueryContext { // Used for region scan, relating methods are to be added. private Map<IDeviceID, DeviceContext> devicePathsToContext; - private List<String> externalTsFilePaths; - private List<TsFileResource> externalTsFileResources; + private ExternalTsFileQueryResource externalTsFileQueryResource; // Shared by all scan operators in this fragment instance to avoid memory problem protected IQueryDataSource sharedQueryDataSource; @@ -624,12 +621,26 @@ public class FragmentInstanceContext extends QueryContext { this.devicePathsToContext = devicePathsToContext; } - public void addExternalTsFilePaths(List<String> externalTsFilePaths) { - if (this.externalTsFilePaths == null) { - this.externalTsFilePaths = new ArrayList<>(externalTsFilePaths); - return; + public void addExternalTsFileQueryResource( + ExternalTsFileQueryResource externalTsFileQueryResource) { + this.externalTsFileQueryResource = externalTsFileQueryResource; + } + + public boolean initExternalTsFileQueryDataSource( + ExternalTsFileQueryResource externalTsFileQueryResource) { + long startTime = System.nanoTime(); + try { + if (externalTsFileQueryResource == null) { + this.sharedQueryDataSource = EMPTY_QUERY_DATA_SOURCE; + return true; + } + + this.sharedQueryDataSource = new ExternalTsFileQueryDataSource(externalTsFileQueryResource); + closedUnseqFileNum = externalTsFileQueryResource.getTsFileResources().size(); + return true; + } finally { + addInitQueryDataSourceCost(System.nanoTime() - startTime); } - this.externalTsFilePaths.addAll(externalTsFilePaths); } public MemoryReservationManager getMemoryReservationContext() { @@ -799,44 +810,6 @@ public class FragmentInstanceContext extends QueryContext { } } - public boolean initExternalTsFileQueryDataSource(List<String> externalTsFilePaths) - throws QueryProcessException { - long startTime = System.nanoTime(); - try { - if (externalTsFilePaths == null || externalTsFilePaths.isEmpty()) { - this.sharedQueryDataSource = EMPTY_QUERY_DATA_SOURCE; - return true; - } - - externalTsFileResources = new ArrayList<>(externalTsFilePaths.size()); - for (String externalTsFilePath : externalTsFilePaths) { - TsFileResource resource = - new TsFileResource(new File(externalTsFilePath), TsFileResourceStatus.NORMAL); - if (resource.resourceFileExists()) { - try { - resource.deserialize(); - } catch (IOException e) { - throw new QueryProcessException( - "Failed to deserialize external TsFile resource: " - + externalTsFilePath - + ", " - + e.getMessage()); - } - } else { - resource.setTimeIndex(new FileTimeIndex(Long.MIN_VALUE, Long.MAX_VALUE)); - } - externalTsFileResources.add(resource); - } - - this.sharedQueryDataSource = - new QueryDataSource(Collections.emptyList(), externalTsFileResources); - closedUnseqFileNum = externalTsFileResources.size(); - return true; - } finally { - addInitQueryDataSourceCost(System.nanoTime() - startTime); - } - } - public synchronized IQueryDataSource getSharedQueryDataSource() throws QueryProcessException { if (sharedQueryDataSource == null) { switch (queryDataSourceType) { @@ -863,8 +836,8 @@ public class FragmentInstanceContext extends QueryContext { } break; case EXTERNAL_TSFILE_SCAN: - if (initExternalTsFileQueryDataSource(externalTsFilePaths)) { - externalTsFilePaths = null; + if (initExternalTsFileQueryDataSource(externalTsFileQueryResource)) { + externalTsFileQueryResource = null; } else { return getUnfinishedQueryDataSource(); } @@ -1081,9 +1054,7 @@ public class FragmentInstanceContext extends QueryContext { unClosedFilePaths = null; } - if (externalTsFileResources != null) { - externalTsFileResources = null; - } + externalTsFileQueryResource = null; // release TVList/AlignedTVList owned by current query releaseTVListOwnedByQuery(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java index e3030c0752c..031c10dc8a4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java @@ -251,18 +251,9 @@ public abstract class AbstractAggTableScanOperator extends AbstractDataSourceOpe // all data of current device has been consumed updateResultTsBlock(); timeIterator.resetCurTimeRange(); - nextDevice(); - - if (currentDeviceIndex < deviceCount) { - // construct AlignedSeriesScanUtil for next device - constructAlignedSeriesScanUtil(); - queryDataSource.reset(); - this.seriesScanUtil.initQueryDataSource(queryDataSource); - } + moveToNextDevice(); if (currentDeviceIndex >= deviceCount) { - // all devices have been consumed - timeIterator.setFinished(); return Optional.of(true); } else { return Optional.of(false); @@ -715,22 +706,25 @@ public abstract class AbstractAggTableScanOperator extends AbstractDataSourceOpe if (allAggregatorsHasFinalResult && (timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR || tableAggregators.isEmpty())) { - nextDevice(); - inputTsBlock = null; - - if (currentDeviceIndex < deviceCount) { - // construct AlignedSeriesScanUtil for next device - constructAlignedSeriesScanUtil(); - queryDataSource.reset(); - this.seriesScanUtil.initQueryDataSource(queryDataSource); - } + moveToNextDevice(); + allAggregatorsHasFinalResult = false; + } + } - if (currentDeviceIndex >= deviceCount) { - // all devices have been consumed - timeIterator.setFinished(); - } + protected void moveToNextDevice() throws Exception { + nextDevice(); + inputTsBlock = null; - allAggregatorsHasFinalResult = false; + if (currentDeviceIndex < deviceCount) { + // construct AlignedSeriesScanUtil for next device + constructAlignedSeriesScanUtil(); + queryDataSource.reset(); + this.seriesScanUtil.initQueryDataSource(queryDataSource); + } + + if (currentDeviceIndex >= deviceCount) { + // all devices have been consumed + timeIterator.setFinished(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractDeviceTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractDeviceTableScanOperator.java deleted file mode 100644 index e4b119291ae..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractDeviceTableScanOperator.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.execution.operator.source.relational; - -import org.apache.iotdb.calc.plan.planner.CommonOperatorUtils; -import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; - -import org.apache.tsfile.utils.RamUsageEstimator; - -import java.util.List; - -import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER; - -public abstract class AbstractDeviceTableScanOperator extends AbstractTableScanOperator { - private static final long INSTANCE_SIZE = - RamUsageEstimator.shallowSizeOfInstance(AbstractDeviceTableScanOperator.class); - - protected List<DeviceEntry> deviceEntries; - protected int deviceCount; - protected int currentDeviceIndex; - - protected AbstractDeviceTableScanOperator(AbstractTableScanOperatorParameter parameter) { - super(parameter); - this.deviceEntries = parameter.deviceEntries; - this.deviceCount = parameter.deviceEntries.size(); - this.currentDeviceIndex = 0; - this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER, Integer.toString(this.deviceCount)); - recordCurrentDeviceIndex(); - constructAlignedSeriesScanUtil(); - } - - @Override - protected boolean hasCurrentDeviceEntry() { - return currentDeviceIndex < deviceCount; - } - - @Override - protected DeviceEntry getCurrentDeviceEntry() { - return deviceEntries.get(currentDeviceIndex); - } - - @Override - protected boolean advanceDeviceEntry() { - currentDeviceIndex++; - return hasCurrentDeviceEntry(); - } - - @Override - protected void recordCurrentDeviceIndex() { - this.operatorContext.recordSpecifiedInfo( - CommonOperatorUtils.CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex)); - } - - @Override - public long ramBytesUsed() { - return super.ramBytesUsed() - + INSTANCE_SIZE - - AbstractTableScanOperator.INSTANCE_SIZE - + RamUsageEstimator.sizeOfCollection(deviceEntries); - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java index 45481e95f11..523f0d1367a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational; +import org.apache.iotdb.calc.plan.planner.CommonOperatorUtils; import org.apache.iotdb.commons.path.AlignedFullPath; import org.apache.iotdb.commons.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; @@ -50,15 +51,20 @@ import java.util.stream.Collectors; import static org.apache.iotdb.commons.queryengine.plan.relational.type.InternalTypeManager.getTSDataType; import static org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator.appendDataIntoBuilder; +import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER; public abstract class AbstractTableScanOperator extends AbstractSeriesScanOperator { protected static final long INSTANCE_SIZE = - RamUsageEstimator.shallowSizeOfInstance(AbstractTableScanOperator.class); + RamUsageEstimator.shallowSizeOfInstance(TableScanOperator.class); private final List<ColumnSchema> columnSchemas; private final int[] columnsIndexArray; + protected final List<DeviceEntry> deviceEntries; + + protected final int deviceCount; + protected final Ordering scanOrder; protected final SeriesScanOptions seriesScanOptions; @@ -78,11 +84,16 @@ public abstract class AbstractTableScanOperator extends AbstractSeriesScanOperat private QueryDataSource queryDataSource; + protected int currentDeviceIndex; + public AbstractTableScanOperator(AbstractTableScanOperatorParameter parameter) { this.sourceId = parameter.sourceId; this.operatorContext = parameter.context; this.columnSchemas = parameter.columnSchemas; this.columnsIndexArray = parameter.columnsIndexArray; + this.deviceEntries = parameter.deviceEntries; + this.deviceCount = parameter.deviceEntries.size(); + this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER, Integer.toString(this.deviceCount)); this.scanOrder = parameter.scanOrder; this.seriesScanOptions = parameter.seriesScanOptions; this.measurementColumnNames = parameter.measurementColumnNames; @@ -92,12 +103,17 @@ public abstract class AbstractTableScanOperator extends AbstractSeriesScanOperat parameter.measurementSchemas.stream() .map(IMeasurementSchema::getType) .collect(Collectors.toList()); + this.currentDeviceIndex = 0; + this.operatorContext.recordSpecifiedInfo( + CommonOperatorUtils.CURRENT_DEVICE_INDEX_STRING, Integer.toString(0)); + // allSensors include time and all field columns this.maxReturnSize = Math.min( maxReturnSize, allSensors.size() * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()); this.maxTsBlockLineNum = parameter.maxTsBlockLineNum; + constructAlignedSeriesScanUtil(); } @Override @@ -181,7 +197,7 @@ public abstract class AbstractTableScanOperator extends AbstractSeriesScanOperat } private void constructResultTsBlock() { - DeviceEntry currentDeviceEntry = getCurrentDeviceEntry(); + DeviceEntry currentDeviceEntry = deviceEntries.get(currentDeviceIndex); this.resultTsBlock = MeasurementToTableViewAdaptorUtils.toTableBlock( measurementDataBlock, @@ -201,7 +217,7 @@ public abstract class AbstractTableScanOperator extends AbstractSeriesScanOperat @Override public boolean isFinished() throws Exception { return (retainedTsBlock == null) - && (!hasCurrentDeviceEntry() || seriesScanOptions.limitConsumedUp()); + && (currentDeviceIndex >= deviceCount || seriesScanOptions.limitConsumedUp()); } @Override @@ -233,37 +249,32 @@ public abstract class AbstractTableScanOperator extends AbstractSeriesScanOperat this.measurementDataBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum); } - private void moveToNextDevice() { - if (advanceDeviceEntry()) { + protected void moveToNextDevice() { + currentDeviceIndex++; + if (currentDeviceIndex < deviceCount) { // construct AlignedSeriesScanUtil for next device constructAlignedSeriesScanUtil(); // reset QueryDataSource queryDataSource.reset(); this.seriesScanUtil.initQueryDataSource(queryDataSource); - recordCurrentDeviceIndex(); + this.operatorContext.recordSpecifiedInfo( + CommonOperatorUtils.CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex)); } } - protected abstract boolean hasCurrentDeviceEntry(); - - protected abstract DeviceEntry getCurrentDeviceEntry(); - - protected abstract boolean advanceDeviceEntry(); - - protected abstract void recordCurrentDeviceIndex(); - protected void constructAlignedSeriesScanUtil() { - if (!hasCurrentDeviceEntry()) { + if (this.deviceEntries.isEmpty() || currentDeviceIndex >= deviceCount) { // no need to construct SeriesScanUtil, hasNext will return false return; } - if (getCurrentDeviceEntry() == null) { - throw new IllegalStateException("Current device entry in TableScanOperator is empty"); + if (this.deviceEntries.get(this.currentDeviceIndex) == null) { + throw new IllegalStateException( + "Device entries of index " + this.currentDeviceIndex + " in TableScanOperator is empty"); } - DeviceEntry deviceEntry = getCurrentDeviceEntry(); + DeviceEntry deviceEntry = this.deviceEntries.get(this.currentDeviceIndex); AlignedFullPath alignedPath = constructAlignedPath(deviceEntry, measurementColumnNames, measurementSchemas, allSensors); this.seriesScanUtil = @@ -291,7 +302,8 @@ public abstract class AbstractTableScanOperator extends AbstractSeriesScanOperat + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil) + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId) - + (resultTsBlockBuilder == null ? 0 : resultTsBlockBuilder.getRetainedSizeInBytes()); + + (resultTsBlockBuilder == null ? 0 : resultTsBlockBuilder.getRetainedSizeInBytes()) + + RamUsageEstimator.sizeOfCollection(deviceEntries); } public static class AbstractTableScanOperatorParameter { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java index 3b4b2704103..f6e4a84fc75 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileAggTableScanOperator.java @@ -22,11 +22,12 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational; import org.apache.iotdb.commons.path.AlignedFullPath; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanUtil; +import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryDataSource; import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource; -import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceOffset; -import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.MultiWayMergeReader; +import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceTaskRunReader; import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata; @@ -34,8 +35,6 @@ import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.RamUsageEstimator; import java.io.IOException; -import java.util.Collections; -import java.util.List; import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath; @@ -43,39 +42,19 @@ public class ExternalTsFileAggTableScanOperator extends DefaultAggTableScanOpera private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(ExternalTsFileAggTableScanOperator.class); - private final String tableName; - private final ExternalTsFileQueryResource externalTsFileQueryResource; private final int deviceTaskPartitionIndex; - private MultiWayMergeReader deviceTaskReader; - private int loadedDeviceOffsetIndex = -1; - private List<DeviceOffset> currentDeviceOffsets = Collections.emptyList(); + private DeviceTaskRunReader deviceTaskReader; public ExternalTsFileAggTableScanOperator( - AbstractAggTableScanOperatorParameter parameter, - String tableName, - ExternalTsFileQueryResource externalTsFileQueryResource, - int deviceTaskPartitionIndex) { + AbstractAggTableScanOperatorParameter parameter, int deviceTaskPartitionIndex) { super(parameter); - this.tableName = tableName; - this.externalTsFileQueryResource = externalTsFileQueryResource; this.deviceTaskPartitionIndex = deviceTaskPartitionIndex; } - @Override - String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) { - int segmentOffset = - deviceEntry.getDeviceID().segmentNum() > 0 - && tableName.equalsIgnoreCase((String) deviceEntry.getNthSegment(0)) - ? 1 - : 0; - Object segment = deviceEntry.getNthSegment(idColumnIndex + segmentOffset); - return segment == null ? null : (String) segment; - } - @Override protected void constructAlignedSeriesScanUtil() { DeviceEntry deviceEntry = - deviceEntries.isEmpty() || deviceEntries.get(currentDeviceIndex) == null + !hasCurrentRealDeviceEntry() ? new AlignedDeviceEntry(SeriesScanUtil.EMPTY_DEVICE_ID, new Binary[0]) : deviceEntries.get(currentDeviceIndex); this.seriesScanUtil = @@ -92,39 +71,75 @@ public class ExternalTsFileAggTableScanOperator extends DefaultAggTableScanOpera private AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata( TsFileResource resource, AlignedFullPath alignedPath) throws IOException { - if (deviceEntries.isEmpty() || currentDeviceIndex >= deviceEntries.size()) { + if (!hasCurrentRealDeviceEntry()) { return null; } return ExternalTsFileSeriesScanUtil.loadTimeSeriesMetadata( resource, alignedPath, deviceEntries.get(currentDeviceIndex).getDeviceID(), - getCurrentDeviceOffsets(), - externalTsFileQueryResource.getTsFilePaths(), + deviceTaskReader.getCurrentDeviceOffsetMap().get(resource), ((OperatorContext) operatorContext).getInstanceContext(), seriesScanOptions.getGlobalTimeFilter()); } - private List<DeviceOffset> getCurrentDeviceOffsets() throws IOException { - if (loadedDeviceOffsetIndex == currentDeviceIndex) { - return currentDeviceOffsets; - } - if (deviceTaskReader == null) { + @Override + public void initQueryDataSource(IQueryDataSource dataSource) { + ExternalTsFileQueryResource externalTsFileQueryResource = + ((ExternalTsFileQueryDataSource) dataSource).getExternalTsFileQueryResource(); + if (hasCurrentRealDeviceEntry() && deviceTaskReader == null) { deviceTaskReader = - externalTsFileQueryResource.getMultiWayMergeReader(deviceTaskPartitionIndex); + externalTsFileQueryResource.getDeviceTaskRunReader(deviceTaskPartitionIndex); } - DeviceEntry currentDeviceEntry = deviceEntries.get(currentDeviceIndex); - while (deviceTaskReader.hasNextDevice()) { - DeviceEntry deviceEntry = deviceTaskReader.nextDevice(); - if (deviceEntry.getDeviceID().equals(currentDeviceEntry.getDeviceID())) { - currentDeviceOffsets = deviceTaskReader.getCurrentDeviceOffsets(); - loadedDeviceOffsetIndex = currentDeviceIndex; - return currentDeviceOffsets; + super.initQueryDataSource( + hasCurrentRealDeviceEntry() ? updateCurrentDeviceQueryDataSource() : dataSource); + } + + @Override + protected void moveToNextDevice() throws Exception { + nextDevice(); + inputTsBlock = null; + + if (currentDeviceIndex < deviceCount) { + constructAlignedSeriesScanUtil(); + queryDataSource = + hasCurrentRealDeviceEntry() ? updateCurrentDeviceQueryDataSource() : queryDataSource; + seriesScanUtil.initQueryDataSource(queryDataSource); + } + + if (currentDeviceIndex >= deviceCount) { + timeIterator.setFinished(); + } + } + + private boolean hasCurrentRealDeviceEntry() { + return !deviceEntries.isEmpty() + && currentDeviceIndex < deviceEntries.size() + && deviceEntries.get(currentDeviceIndex) != null; + } + + private ExternalTsFileQueryDataSource updateCurrentDeviceQueryDataSource() { + try { + if (!deviceTaskReader.nextDevice()) { + throw new IllegalStateException( + "Unexpected end of external TsFile device task reader at device index " + + currentDeviceIndex); + } + DeviceEntry expectedDeviceEntry = deviceEntries.get(currentDeviceIndex); + DeviceEntry currentDeviceEntry = deviceTaskReader.getCurrentDevice(); + if (!expectedDeviceEntry.getDeviceID().equals(currentDeviceEntry.getDeviceID())) { + throw new IllegalStateException( + String.format( + "External TsFile device task reader is not aligned with device entries at index %d:" + + " expected %s but got %s", + currentDeviceIndex, + expectedDeviceEntry.getDeviceID(), + currentDeviceEntry.getDeviceID())); } + return deviceTaskReader.getCurrentDeviceQueryDataSource(); + } catch (IOException e) { + throw new RuntimeException("Failed to update external TsFile device resources", e); } - currentDeviceOffsets = Collections.emptyList(); - loadedDeviceOffsetIndex = currentDeviceIndex; - return currentDeviceOffsets; } @Override @@ -138,9 +153,6 @@ public class ExternalTsFileAggTableScanOperator extends DefaultAggTableScanOpera @Override public long ramBytesUsed() { - return super.ramBytesUsed() - + INSTANCE_SIZE - - AbstractDefaultAggTableScanOperator.INSTANCE_SIZE - + RamUsageEstimator.sizeOfCollection(currentDeviceOffsets); + return super.ramBytesUsed() + INSTANCE_SIZE - AbstractDefaultAggTableScanOperator.INSTANCE_SIZE; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java index 6ad6aa2af3f..d4d327be703 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java @@ -86,20 +86,15 @@ public class ExternalTsFileSeriesScanUtil extends AlignedSeriesScanUtil { TsFileResource resource, AlignedFullPath alignedPath, IDeviceID currentDeviceID, - List<DeviceOffset> currentDeviceOffsets, - List<String> tsFilePaths, + DeviceOffset currentDeviceOffset, FragmentInstanceContext context, Filter globalTimeFilter) throws IOException { - if (currentDeviceOffsets == null || !currentDeviceID.equals(alignedPath.getDeviceId())) { + if (currentDeviceOffset == null || !currentDeviceID.equals(alignedPath.getDeviceId())) { return null; } - long[] deviceMeasurementNodeOffset = - getDeviceMeasurementNodeOffset(currentDeviceOffsets, tsFilePaths, resource.getTsFilePath()); - if (deviceMeasurementNodeOffset == null) { - return null; - } + long[] deviceMeasurementNodeOffset = currentDeviceOffset.getMeasurementNodeOffset(); // TODO: Use deviceMeasurementNodeOffset after FileLoaderUtils supports offset-based metadata // loading in this branch. return FileLoaderUtils.loadAlignedTimeSeriesMetadata( @@ -111,16 +106,6 @@ public class ExternalTsFileSeriesScanUtil extends AlignedSeriesScanUtil { context.isIgnoreAllNullRows()); } - private static long[] getDeviceMeasurementNodeOffset( - List<DeviceOffset> currentDeviceOffsets, List<String> tsFilePaths, String tsFilePath) { - for (DeviceOffset offset : currentDeviceOffsets) { - if (tsFilePath.equals(tsFilePaths.get(offset.getFileIndex()))) { - return offset.getMeasurementNodeOffset(); - } - } - return null; - } - @FunctionalInterface public interface ExternalTsFileMetadataLoader { AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java index ad14540e291..73a9cb11ad5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileTableScanOperator.java @@ -19,63 +19,41 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational; +import org.apache.iotdb.calc.plan.planner.CommonOperatorUtils; import org.apache.iotdb.commons.path.AlignedFullPath; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryDataSource; import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource; -import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceOffset; -import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.MultiWayMergeReader; +import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceTaskRunReader; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata; import org.apache.tsfile.utils.RamUsageEstimator; import java.io.IOException; -import java.util.Collections; -import java.util.List; public class ExternalTsFileTableScanOperator extends TableScanOperator { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(ExternalTsFileTableScanOperator.class); - private static final long ABSTRACT_DEVICE_TABLE_SCAN_OPERATOR_INSTANCE_SIZE = - RamUsageEstimator.shallowSizeOfInstance(AbstractDeviceTableScanOperator.class); - private final String tableName; - private final ExternalTsFileQueryResource externalTsFileQueryResource; private final int deviceTaskPartitionIndex; - private MultiWayMergeReader deviceTaskReader; - private int loadedDeviceOffsetIndex = -1; - private List<DeviceOffset> currentDeviceOffsets = Collections.emptyList(); + private DeviceTaskRunReader deviceTaskReader; public ExternalTsFileTableScanOperator( - AbstractTableScanOperatorParameter parameter, - String tableName, - ExternalTsFileQueryResource externalTsFileQueryResource, - int deviceTaskPartitionIndex) { + AbstractTableScanOperatorParameter parameter, int deviceTaskPartitionIndex) { super(parameter); - this.tableName = tableName; - this.externalTsFileQueryResource = externalTsFileQueryResource; this.deviceTaskPartitionIndex = deviceTaskPartitionIndex; } - @Override - String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) { - int segmentOffset = - deviceEntry.getDeviceID().segmentNum() > 0 - && tableName.equalsIgnoreCase((String) deviceEntry.getNthSegment(0)) - ? 1 - : 0; - Object segment = deviceEntry.getNthSegment(idColumnIndex + segmentOffset); - return segment == null ? null : (String) segment; - } - @Override protected void constructAlignedSeriesScanUtil() { - if (!hasCurrentDeviceEntry()) { + if (currentDeviceIndex >= deviceCount) { return; } - DeviceEntry deviceEntry = getCurrentDeviceEntry(); + DeviceEntry deviceEntry = deviceEntries.get(currentDeviceIndex); if (deviceEntry == null) { throw new IllegalStateException("Current device entry in TableScanOperator is empty"); } @@ -97,33 +75,58 @@ public class ExternalTsFileTableScanOperator extends TableScanOperator { return ExternalTsFileSeriesScanUtil.loadTimeSeriesMetadata( resource, alignedPath, - getCurrentDeviceEntry().getDeviceID(), - getCurrentDeviceOffsets(), - externalTsFileQueryResource.getTsFilePaths(), + deviceEntries.get(currentDeviceIndex).getDeviceID(), + deviceTaskReader.getCurrentDeviceOffsetMap().get(resource), ((OperatorContext) operatorContext).getInstanceContext(), seriesScanOptions.getGlobalTimeFilter()); } - private List<DeviceOffset> getCurrentDeviceOffsets() throws IOException { - if (loadedDeviceOffsetIndex == currentDeviceIndex) { - return currentDeviceOffsets; - } + @Override + public void initQueryDataSource(IQueryDataSource dataSource) { + ExternalTsFileQueryResource externalTsFileQueryResource = + ((ExternalTsFileQueryDataSource) dataSource).getExternalTsFileQueryResource(); if (deviceTaskReader == null) { deviceTaskReader = - externalTsFileQueryResource.getMultiWayMergeReader(deviceTaskPartitionIndex); + externalTsFileQueryResource.getDeviceTaskRunReader(deviceTaskPartitionIndex); + } + IQueryDataSource currentDataSource = + currentDeviceIndex < deviceCount ? updateCurrentDeviceQueryDataSource() : dataSource; + super.initQueryDataSource(currentDataSource); + } + + @Override + protected void moveToNextDevice() { + currentDeviceIndex++; + if (currentDeviceIndex < deviceCount) { + constructAlignedSeriesScanUtil(); + seriesScanUtil.initQueryDataSource(updateCurrentDeviceQueryDataSource()); + this.operatorContext.recordSpecifiedInfo( + CommonOperatorUtils.CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex)); } - DeviceEntry currentDeviceEntry = getCurrentDeviceEntry(); - while (deviceTaskReader.hasNextDevice()) { - DeviceEntry deviceEntry = deviceTaskReader.nextDevice(); - if (deviceEntry.getDeviceID().equals(currentDeviceEntry.getDeviceID())) { - currentDeviceOffsets = deviceTaskReader.getCurrentDeviceOffsets(); - loadedDeviceOffsetIndex = currentDeviceIndex; - return currentDeviceOffsets; + } + + private ExternalTsFileQueryDataSource updateCurrentDeviceQueryDataSource() { + try { + if (!deviceTaskReader.nextDevice()) { + throw new IllegalStateException( + "Unexpected end of external TsFile device task reader at device index " + + currentDeviceIndex); + } + DeviceEntry expectedDeviceEntry = deviceEntries.get(currentDeviceIndex); + DeviceEntry currentDeviceEntry = deviceTaskReader.getCurrentDevice(); + if (!expectedDeviceEntry.getDeviceID().equals(currentDeviceEntry.getDeviceID())) { + throw new IllegalStateException( + String.format( + "External TsFile device task reader is not aligned with device entries at index %d:" + + " expected %s but got %s", + currentDeviceIndex, + expectedDeviceEntry.getDeviceID(), + currentDeviceEntry.getDeviceID())); } + return deviceTaskReader.getCurrentDeviceQueryDataSource(); + } catch (IOException e) { + throw new RuntimeException("Failed to update external TsFile device resources", e); } - currentDeviceOffsets = Collections.emptyList(); - loadedDeviceOffsetIndex = currentDeviceIndex; - return currentDeviceOffsets; } @Override @@ -137,9 +140,6 @@ public class ExternalTsFileTableScanOperator extends TableScanOperator { @Override public long ramBytesUsed() { - return super.ramBytesUsed() - + INSTANCE_SIZE - - ABSTRACT_DEVICE_TABLE_SCAN_OPERATOR_INSTANCE_SIZE - + RamUsageEstimator.sizeOfCollection(currentDeviceOffsets); + return super.ramBytesUsed() + INSTANCE_SIZE - AbstractTableScanOperator.INSTANCE_SIZE; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java index a50e978b377..11f10ef3dae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java @@ -21,7 +21,7 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; -public class TableScanOperator extends AbstractDeviceTableScanOperator { +public class TableScanOperator extends AbstractTableScanOperator { public TableScanOperator(AbstractTableScanOperatorParameter parameter) { super(parameter); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewScanOperator.java index 12feaccc42f..03d0e998e71 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewScanOperator.java @@ -23,7 +23,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.tsfile.file.metadata.IDeviceID; -public class TreeAlignedDeviceViewScanOperator extends AbstractDeviceTableScanOperator { +public class TreeAlignedDeviceViewScanOperator extends AbstractTableScanOperator { private final IDeviceID.TreeDeviceIdColumnValueExtractor extractor; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java index b0fc9e1d554..1bc40a2896b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java @@ -1128,6 +1128,13 @@ public class DataNodeTableOperatorGenerator @Override public Operator visitExternalTsFileScan( ExternalTsFileScanNode node, LocalExecutionPlanContext context) { + if (node.getDeviceEntries().isEmpty()) { + OperatorContext operatorContext = + addOperatorContext( + context, node.getPlanNodeId(), EmptyDataOperator.class.getSimpleName()); + return new EmptyDataOperator(operatorContext); + } + AbstractTableScanOperator.AbstractTableScanOperatorParameter parameter = constructAbstractTableScanOperatorParameter( node, @@ -1137,11 +1144,7 @@ public class DataNodeTableOperatorGenerator Long.MAX_VALUE); AbstractTableScanOperator externalTsFileTableScanOperator = - new ExternalTsFileTableScanOperator( - parameter, - node.getQualifiedObjectName().getObjectName(), - node.getExternalTsFileQueryResource(), - node.getDeviceTaskPartitionIndex()); + new ExternalTsFileTableScanOperator(parameter, node.getDeviceTaskPartitionIndex()); context.getInstanceContext().collectTable(node.getQualifiedObjectName().getObjectName()); @@ -1149,7 +1152,9 @@ public class DataNodeTableOperatorGenerator dataDriverContext.addSourceOperator(externalTsFileTableScanOperator); dataDriverContext.setQueryDataSourceType(QueryDataSourceType.EXTERNAL_TSFILE_SCAN); dataDriverContext.setInputDriver(true); - context.getInstanceContext().addExternalTsFilePaths(node.getTsFilePaths()); + context + .getInstanceContext() + .addExternalTsFileQueryResource(node.getExternalTsFileQueryResource()); return externalTsFileTableScanOperator; } @@ -1640,11 +1645,7 @@ public class DataNodeTableOperatorGenerator constructAbstractAggTableScanOperatorParameter(node, context); ExternalTsFileAggTableScanOperator aggTableScanOperator = - new ExternalTsFileAggTableScanOperator( - parameter, - node.getQualifiedObjectName().getObjectName(), - node.getExternalTsFileQueryResource(), - node.getDeviceTaskPartitionIndex()); + new ExternalTsFileAggTableScanOperator(parameter, node.getDeviceTaskPartitionIndex()); context.getInstanceContext().collectTable(node.getQualifiedObjectName().getObjectName()); addSource( @@ -1658,7 +1659,9 @@ public class DataNodeTableOperatorGenerator DataDriverContext dataDriverContext = (DataDriverContext) context.getDriverContext(); dataDriverContext.setQueryDataSourceType(QueryDataSourceType.EXTERNAL_TSFILE_SCAN); - context.getInstanceContext().addExternalTsFilePaths(node.getTsFilePaths()); + context + .getInstanceContext() + .addExternalTsFileQueryResource(node.getExternalTsFileQueryResource()); return aggTableScanOperator; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryDataSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryDataSource.java new file mode 100644 index 00000000000..ac55a8cbfe2 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryDataSource.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile; + +import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.filter.basic.Filter; + +import java.util.Collections; +import java.util.List; + +public class ExternalTsFileQueryDataSource extends QueryDataSource { + + private final ExternalTsFileQueryResource externalTsFileQueryResource; + + public ExternalTsFileQueryDataSource(ExternalTsFileQueryResource externalTsFileQueryResource) { + this(externalTsFileQueryResource, Collections.emptyList()); + } + + ExternalTsFileQueryDataSource( + ExternalTsFileQueryResource externalTsFileQueryResource, + List<TsFileResource> unseqResources) { + super(Collections.emptyList(), unseqResources); + this.externalTsFileQueryResource = externalTsFileQueryResource; + } + + @Override + public IQueryDataSource clone() { + return new ExternalTsFileQueryDataSource(externalTsFileQueryResource, getUnseqResources()); + } + + public ExternalTsFileQueryResource getExternalTsFileQueryResource() { + return externalTsFileQueryResource; + } + + @Override + public boolean isSeqSatisfied( + IDeviceID deviceID, int curIndex, Filter timeFilter, boolean debug) { + return true; + } + + @Override + public boolean isUnSeqSatisfied( + IDeviceID deviceID, int curIndex, Filter timeFilter, boolean debug) { + return true; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java index 8371c6a1687..bd45d94e188 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java @@ -26,6 +26,9 @@ import org.apache.iotdb.db.queryengine.execution.operator.source.relational.Exte import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.LazyTsFileDeviceIterator; @@ -38,7 +41,7 @@ import java.io.BufferedOutputStream; import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.EOFException; +import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -64,19 +67,20 @@ import static java.util.Objects.requireNonNull; public class ExternalTsFileQueryResource implements AutoCloseable { public static final String EXTERNAL_TSFILE_TMP_DIR = "external-tsfile"; - private static final long DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES = 8L * 1024 * 1024; + private static final long DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES = 1; + // private static final long DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES = 8L * 1024 * 1024; private static final long DEVICE_OFFSET_INSTANCE_SIZE_IN_BYTES = 32L; private final QueryId queryId; private final Path queryTempRoot; private final String tableName; private final List<String> tsFilePaths; + private final List<TsFileResource> tsFileResources; private final LongConsumer ioSizeRecorder; private final List<DeviceEntry> deviceEntries = new ArrayList<>(); private List<DeviceTaskPartition> deviceTaskPartitions = Collections.emptyList(); private Comparator<DeviceEntry> deviceEntryComparator; - private boolean readersRetained; private boolean closed; public ExternalTsFileQueryResource( @@ -95,16 +99,19 @@ public class ExternalTsFileQueryResource implements AutoCloseable { this.tableName = tableName; this.tsFilePaths = Collections.unmodifiableList(new ArrayList<>(requireNonNull(tsFilePaths, "tsFilePaths"))); + this.tsFileResources = createTsFileResources(this.tsFilePaths); this.ioSizeRecorder = requireNonNull(ioSizeRecorder, "ioSizeRecorder is null"); + for (String tsFilePath : tsFilePaths) { + FileReaderManager.getInstance().increaseExternalFileReaderReference(tsFilePath); + } } - public synchronized void collectDeviceEntries( + public void collectDeviceEntries( SchemaFilter schemaFilter, Comparator<DeviceEntry> comparator, int partitionCount) { checkNotClosed(); - retainFileReaderReferences(); ExternalTsFileDeviceFilterVisitor deviceFilterVisitor = new ExternalTsFileDeviceFilterVisitor(); try (DeviceCollector deviceCollector = new DeviceCollector()) { - List<DeviceTaskPartition> partitions = createDeviceTaskPartitions(partitionCount); + createDeviceTaskPartitions(partitionCount); while (deviceCollector.hasNextDevice()) { IDeviceID deviceID = deviceCollector.nextDevice(); if (schemaFilter != null @@ -118,22 +125,23 @@ public class ExternalTsFileQueryResource implements AutoCloseable { new DeviceTask( deviceEntryIndex, new ArrayList<>(deviceCollector.getCurrentDeviceOffsets())); DeviceTaskPartition partition = - partitions.get(Math.floorMod(deviceID.hashCode(), partitions.size())); + deviceTaskPartitions.get( + Math.floorMod(deviceID.hashCode(), deviceTaskPartitions.size())); partition.add(deviceTask); if (partition.getEstimatedSizeInBytes() >= DEVICE_TASK_BUCKET_TARGET_SIZE_IN_BYTES) { partition.flush(comparator); } } deviceEntryComparator = comparator; - collectDeviceTaskPartitions(partitions, comparator); + collectDeviceTaskPartitions(comparator); } } - public synchronized MultiWayMergeReader getMultiWayMergeReader(int partitionIndex) { + public DeviceTaskRunReader getDeviceTaskRunReader(int partitionIndex) { checkNotClosed(); DeviceTaskPartition partition = getDeviceTaskPartition(partitionIndex); try { - return new DeviceTaskRunReader(partition.getRunFiles(), deviceEntries, deviceEntryComparator); + return new DeviceTaskRunReader(partition); } catch (IOException e) { throw new RuntimeException("Failed to create external TsFile device task run reader", e); } @@ -143,6 +151,10 @@ public class ExternalTsFileQueryResource implements AutoCloseable { return tsFilePaths; } + public List<TsFileResource> getTsFileResources() { + return tsFileResources; + } + public List<DeviceEntry> getDeviceEntries() { return deviceEntries; } @@ -162,7 +174,7 @@ public class ExternalTsFileQueryResource implements AutoCloseable { } @Override - public synchronized void close() { + public void close() { if (closed) { return; } @@ -175,24 +187,10 @@ public class ExternalTsFileQueryResource implements AutoCloseable { } } - private void retainFileReaderReferences() { - if (readersRetained) { - return; - } - for (String tsFilePath : tsFilePaths) { - FileReaderManager.getInstance().increaseExternalFileReaderReference(tsFilePath); - } - readersRetained = true; - } - private void releaseFileReaderReferences() { - if (!readersRetained) { - return; - } for (String tsFilePath : tsFilePaths) { FileReaderManager.getInstance().decreaseExternalFileReaderReference(tsFilePath); } - readersRetained = false; } private void checkNotClosed() { @@ -201,15 +199,28 @@ public class ExternalTsFileQueryResource implements AutoCloseable { } } - public interface MultiWayMergeReader extends AutoCloseable { - boolean hasNextDevice() throws IOException; - - DeviceEntry nextDevice() throws IOException; - - List<DeviceOffset> getCurrentDeviceOffsets(); - - @Override - void close() throws IOException; + private static List<TsFileResource> createTsFileResources(List<String> tsFilePaths) { + List<TsFileResource> tsFileResources = new ArrayList<>(tsFilePaths.size()); + for (String tsFilePath : tsFilePaths) { + TsFileResource resource = + new TsFileResource(new File(tsFilePath), TsFileResourceStatus.NORMAL); + if (resource.resourceFileExists()) { + try { + resource.deserialize(); + } catch (IOException e) { + throw new RuntimeException( + "Failed to deserialize external TsFile resource: " + + tsFilePath + + ", " + + e.getMessage(), + e); + } + } else { + resource.setTimeIndex(new FileTimeIndex(Long.MIN_VALUE, Long.MAX_VALUE)); + } + tsFileResources.add(resource); + } + return Collections.unmodifiableList(tsFileResources); } public class DeviceTaskPartition { @@ -283,6 +294,16 @@ public class ExternalTsFileQueryResource implements AutoCloseable { return !deviceEntryIndexes.isEmpty(); } + private void finish(Comparator<DeviceEntry> comparator) { + if (pendingDeviceTasks.isEmpty()) { + return; + } + sortPendingDeviceTasks(comparator); + for (DeviceTask deviceTask : pendingDeviceTasks) { + deviceEntryIndexes.add(deviceTask.deviceEntryIndex); + } + } + private void sortDeviceEntries(Comparator<DeviceEntry> comparator) { if (comparator != null) { deviceEntryIndexes.sort( @@ -300,36 +321,34 @@ public class ExternalTsFileQueryResource implements AutoCloseable { private List<Path> getRunFiles() { return runFiles; } + + private List<DeviceTask> getPendingDeviceTasks() { + return pendingDeviceTasks; + } } - private List<DeviceTaskPartition> createDeviceTaskPartitions(int partitionCount) { + private void createDeviceTaskPartitions(int partitionCount) { if (partitionCount <= 0) { throw new IllegalArgumentException( "External TsFile device task partition count must be positive"); } - List<DeviceTaskPartition> partitions = new ArrayList<>(partitionCount); + deviceTaskPartitions = new ArrayList<>(partitionCount); for (int i = 0; i < partitionCount; i++) { - partitions.add(new DeviceTaskPartition(i)); + deviceTaskPartitions.add(new DeviceTaskPartition(i)); } - return partitions; } - private void collectDeviceTaskPartitions( - List<DeviceTaskPartition> partitions, Comparator<DeviceEntry> comparator) { - if (partitions.isEmpty()) { - deviceTaskPartitions = Collections.emptyList(); - return; - } - List<DeviceTaskPartition> nonEmptyPartitions = new ArrayList<>(partitions.size()); - for (DeviceTaskPartition partition : partitions) { - partition.flush(comparator); + private void collectDeviceTaskPartitions(Comparator<DeviceEntry> comparator) { + Iterator<DeviceTaskPartition> iterator = deviceTaskPartitions.iterator(); + while (iterator.hasNext()) { + DeviceTaskPartition partition = iterator.next(); + partition.finish(comparator); if (!partition.hasDeviceTasks()) { + iterator.remove(); continue; } partition.sortDeviceEntries(comparator); - nonEmptyPartitions.add(partition); } - deviceTaskPartitions = nonEmptyPartitions; } private Path writeDeviceTaskRun(Path runRoot, int runIndex, List<DeviceTask> deviceTasks) @@ -356,58 +375,74 @@ public class ExternalTsFileQueryResource implements AutoCloseable { return size; } - private static class DeviceTaskRunReader implements MultiWayMergeReader { + public class DeviceTaskRunReader implements AutoCloseable { - private final List<DeviceEntry> deviceEntries; private final PriorityQueue<DeviceTaskRunCursor> runCursors; - private DeviceTask nextDeviceTask; - private List<DeviceOffset> currentDeviceOffsets = Collections.emptyList(); + private DeviceEntry currentDevice; + private ExternalTsFileQueryDataSource currentDeviceQueryDataSource; + private Map<TsFileResource, DeviceOffset> currentDeviceOffsetMap; - private DeviceTaskRunReader( - List<Path> runFiles, List<DeviceEntry> deviceEntries, Comparator<DeviceEntry> comparator) - throws IOException { - this.deviceEntries = deviceEntries; + private DeviceTaskRunReader(DeviceTaskPartition partition) throws IOException { Comparator<DeviceTaskRunCursor> cursorComparator = (left, right) -> - comparator == null + deviceEntryComparator == null ? left.getCurrentDeviceEntry() .getDeviceID() .compareTo(right.getCurrentDeviceEntry().getDeviceID()) - : comparator.compare(left.getCurrentDeviceEntry(), right.getCurrentDeviceEntry()); + : deviceEntryComparator.compare( + left.getCurrentDeviceEntry(), right.getCurrentDeviceEntry()); this.runCursors = new PriorityQueue<>(cursorComparator); - for (Path runFile : runFiles) { - DeviceTaskRunCursor cursor = new DeviceTaskRunCursor(runFile, deviceEntries); + for (Path runFile : partition.getRunFiles()) { + DeviceTaskRunCursor cursor = new DiskDeviceTaskRunCursor(runFile, deviceEntries); if (cursor.hasCurrentDeviceTask()) { runCursors.add(cursor); } else { cursor.close(); } } + DeviceTaskRunCursor memoryCursor = + new MemoryDeviceTaskRunCursor(partition.getPendingDeviceTasks(), deviceEntries); + if (memoryCursor.hasCurrentDeviceTask()) { + runCursors.add(memoryCursor); + } } - @Override - public boolean hasNextDevice() throws IOException { - if (nextDeviceTask != null) { - return true; + public boolean nextDevice() throws IOException { + if (runCursors.isEmpty()) { + return false; + } + DeviceTaskRunCursor cursor = runCursors.poll(); + DeviceTask result = cursor.getCurrentDeviceTask(); + cursor.advance(); + if (cursor.hasCurrentDeviceTask()) { + runCursors.add(cursor); + } else { + cursor.close(); } - nextDeviceTask = readNextDeviceTask(); - return nextDeviceTask != null; - } - @Override - public DeviceEntry nextDevice() throws IOException { - if (!hasNextDevice()) { - throw new EOFException("No more external TsFile device task"); + currentDevice = deviceEntries.get(result.deviceEntryIndex); + List<TsFileResource> unseqResources = new ArrayList<>(result.deviceOffsets.size()); + currentDeviceOffsetMap = new HashMap<>(result.deviceOffsets.size()); + for (DeviceOffset deviceOffset : result.deviceOffsets) { + TsFileResource tsFileResource = tsFileResources.get(deviceOffset.getFileIndex()); + unseqResources.add(tsFileResource); + currentDeviceOffsetMap.put(tsFileResource, deviceOffset); } - DeviceTask deviceTask = nextDeviceTask; - nextDeviceTask = null; - currentDeviceOffsets = deviceTask.deviceOffsets; - return deviceEntries.get(deviceTask.deviceEntryIndex); + currentDeviceQueryDataSource = + new ExternalTsFileQueryDataSource(ExternalTsFileQueryResource.this, unseqResources); + return true; } - @Override - public List<DeviceOffset> getCurrentDeviceOffsets() { - return currentDeviceOffsets; + public DeviceEntry getCurrentDevice() { + return currentDevice; + } + + public ExternalTsFileQueryDataSource getCurrentDeviceQueryDataSource() { + return currentDeviceQueryDataSource; + } + + public Map<TsFileResource, DeviceOffset> getCurrentDeviceOffsetMap() { + return currentDeviceOffsetMap; } @Override @@ -428,31 +463,28 @@ public class ExternalTsFileQueryResource implements AutoCloseable { throw exception; } } + } - private DeviceTask readNextDeviceTask() throws IOException { - if (runCursors.isEmpty()) { - return null; - } - DeviceTaskRunCursor cursor = runCursors.poll(); - DeviceTask result = cursor.getCurrentDeviceTask(); - cursor.advance(); - if (cursor.hasCurrentDeviceTask()) { - runCursors.add(cursor); - } else { - cursor.close(); - } - return result; - } + private interface DeviceTaskRunCursor extends Closeable { + + boolean hasCurrentDeviceTask(); + + DeviceTask getCurrentDeviceTask(); + + DeviceEntry getCurrentDeviceEntry(); + + void advance() throws IOException; } - private static class DeviceTaskRunCursor implements Closeable { + private static class DiskDeviceTaskRunCursor implements DeviceTaskRunCursor { private final List<DeviceEntry> deviceEntries; private final DataInputStream inputStream; private int remainingDeviceTasks; private DeviceTask currentDeviceTask; - private DeviceTaskRunCursor(Path runFile, List<DeviceEntry> deviceEntries) throws IOException { + private DiskDeviceTaskRunCursor(Path runFile, List<DeviceEntry> deviceEntries) + throws IOException { this.deviceEntries = deviceEntries; this.inputStream = new DataInputStream(new BufferedInputStream(Files.newInputStream(runFile))); @@ -460,7 +492,8 @@ public class ExternalTsFileQueryResource implements AutoCloseable { advance(); } - private void advance() throws IOException { + @Override + public void advance() throws IOException { if (remainingDeviceTasks <= 0) { currentDeviceTask = null; return; @@ -469,15 +502,18 @@ public class ExternalTsFileQueryResource implements AutoCloseable { currentDeviceTask = DeviceTask.deserialize(inputStream); } - private boolean hasCurrentDeviceTask() { + @Override + public boolean hasCurrentDeviceTask() { return currentDeviceTask != null; } - private DeviceTask getCurrentDeviceTask() { + @Override + public DeviceTask getCurrentDeviceTask() { return currentDeviceTask; } - private DeviceEntry getCurrentDeviceEntry() { + @Override + public DeviceEntry getCurrentDeviceEntry() { return deviceEntries.get(currentDeviceTask.deviceEntryIndex); } @@ -487,6 +523,50 @@ public class ExternalTsFileQueryResource implements AutoCloseable { } } + private static class MemoryDeviceTaskRunCursor implements DeviceTaskRunCursor { + + private final List<DeviceTask> deviceTasks; + private final List<DeviceEntry> deviceEntries; + private int nextIndex; + private DeviceTask currentDeviceTask; + + private MemoryDeviceTaskRunCursor( + List<DeviceTask> deviceTasks, List<DeviceEntry> deviceEntries) { + this.deviceTasks = deviceTasks; + this.deviceEntries = deviceEntries; + advance(); + } + + @Override + public void advance() { + if (nextIndex >= deviceTasks.size()) { + currentDeviceTask = null; + return; + } + currentDeviceTask = deviceTasks.get(nextIndex++); + } + + @Override + public boolean hasCurrentDeviceTask() { + return currentDeviceTask != null; + } + + @Override + public DeviceTask getCurrentDeviceTask() { + return currentDeviceTask; + } + + @Override + public DeviceEntry getCurrentDeviceEntry() { + return deviceEntries.get(currentDeviceTask.deviceEntryIndex); + } + + @Override + public void close() { + currentDeviceTask = null; + } + } + private class DeviceCollector implements Closeable { private final Map<Integer, LazyTsFileDeviceIterator> deviceIteratorMap = new HashMap<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 0b54b2395a7..92d30d4f649 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -799,13 +799,23 @@ public class TableDistributedPlanGenerator if (partitions.isEmpty()) { return Collections.singletonList(node); } + boolean needFinalAggregation = + node.getStep() == SINGLE && node.getGroupingKeys().isEmpty() && partitions.size() > 1; + AggregationNode finalAggregation = null; + AggregationTableScanNode partialTemplateNode = node; + if (needFinalAggregation) { + Pair<AggregationNode, AggregationTableScanNode> splitResult = + split(node, symbolAllocator, queryId); + finalAggregation = splitResult.left; + partialTemplateNode = splitResult.right; + } List<PlanNode> result = new ArrayList<>(partitions.size()); for (DeviceTaskPartition partition : partitions) { ExternalTsFileAggregationScanNode splitNode = new ExternalTsFileAggregationScanNode( queryId.genPlanNodeId(), node.getQualifiedObjectName(), - node.getOutputSymbols(), + partialTemplateNode.getOutputSymbols(), node.getAssignments(), node.getTagAndAttributeIndexMap(), node.getScanOrder(), @@ -816,11 +826,11 @@ public class TableDistributedPlanGenerator node.isPushLimitToEachDevice(), node.containsNonAlignedDevice(), node.getProjection(), - node.getAggregations(), - node.getGroupingSets(), - node.getPreGroupedSymbols(), - node.getStep(), - node.getGroupIdSymbol(), + partialTemplateNode.getAggregations(), + partialTemplateNode.getGroupingSets(), + partialTemplateNode.getPreGroupedSymbols(), + partialTemplateNode.getStep(), + partialTemplateNode.getGroupIdSymbol(), node.getExternalTsFileQueryResource(), partition.getDeviceEntryIndexes(), partition.getPartitionIndex(), @@ -830,6 +840,11 @@ public class TableDistributedPlanGenerator } sortPropertyContext.ifPresent( propertyContext -> applySortProperty(node, result, propertyContext, false)); + if (needFinalAggregation) { + OrderingScheme childOrdering = nodeOrderingMap.get(result.get(0).getPlanNodeId()); + finalAggregation.setChild(mergeChildrenViaCollectOrMergeSort(childOrdering, result)); + return Collections.singletonList(finalAggregation); + } return result; }
