This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch remove_attr_col in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c2a7b4fbe41b63277e0d3ec744c6a7bd465e1bcf Author: Tian Jiang <[email protected]> AuthorDate: Fri Jan 17 19:00:21 2025 +0800 Remove attribute columns after attributes are updated --- .../iotdb/it/env/cluster/node/DataNodeWrapper.java | 8 +++ .../it/session/IoTDBSessionRelationalIT.java | 82 ++++++++++++++++++++++ .../plan/analyze/schema/SchemaValidator.java | 1 + .../relational/sql/ast/WrappedInsertStatement.java | 78 ++++++++++++-------- .../plan/statement/crud/InsertBaseStatement.java | 50 +++++++++++++ .../crud/InsertMultiTabletsStatement.java | 11 +++ .../plan/statement/crud/InsertRowStatement.java | 7 ++ .../crud/InsertRowsOfOneDeviceStatement.java | 5 ++ .../plan/statement/crud/InsertRowsStatement.java | 11 +++ .../plan/statement/crud/InsertTabletStatement.java | 10 +++ .../storageengine/dataregion/wal/io/WALReader.java | 2 +- 11 files changed, 235 insertions(+), 30 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java index 3df21cb42a6..6bf36bf969a 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java @@ -145,6 +145,14 @@ public class DataNodeWrapper extends AbstractNodeWrapper { return workDirFilePath("data/datanode/system/schema", IoTDBStartCheck.PROPERTIES_FILE_NAME); } + public String getDataDir() { + return getNodePath() + File.separator + "data"; + } + + public String getWalDir() { + return getDataDir() + File.separator + "datanode" + File.separator + "wal"; + } + @Override protected MppJVMConfig initVMConfig() { return MppJVMConfig.builder() diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java index 1f70341892b..9ca071fc17d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java @@ -18,10 +18,16 @@ */ package org.apache.iotdb.relational.it.session; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowsNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode; +import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; +import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALReader; import org.apache.iotdb.isession.ISession; import org.apache.iotdb.isession.ITableSession; import org.apache.iotdb.isession.SessionDataSet; import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.TableClusterIT; import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; @@ -43,6 +49,8 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import java.io.File; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.LocalDate; import java.util.ArrayList; @@ -56,6 +64,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @RunWith(IoTDBTestRunner.class) @@ -1529,4 +1538,77 @@ public class IoTDBSessionRelationalIT { assertEquals(30, cnt); } } + + @Test + public void testAttrColumnRemoved() + throws IoTDBConnectionException, StatementExecutionException, IOException { + EnvFactory.getEnv().cleanClusterEnvironment(); + EnvFactory.getEnv().getConfig().getCommonConfig().setWalMode("SYNC"); + EnvFactory.getEnv().initClusterEnvironment(); + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("create database if not exists db1"); + session.executeNonQueryStatement("use db1"); + session.executeNonQueryStatement( + "CREATE TABLE remove_attr_col (tag1 string tag, attr1 string attribute, " + + "m1 double " + + "field)"); + + // insert tablet to WAL + List<IMeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("tag1", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("attr1", TSDataType.STRING)); + schemaList.add(new MeasurementSchema("m1", TSDataType.DOUBLE)); + final List<ColumnCategory> columnTypes = + Arrays.asList(ColumnCategory.TAG, ColumnCategory.ATTRIBUTE, ColumnCategory.FIELD); + + long timestamp = 0; + Tablet tablet = + new Tablet( + "remove_attr_col", + IMeasurementSchema.getMeasurementNameList(schemaList), + IMeasurementSchema.getDataTypeList(schemaList), + columnTypes); + + for (int rowIndex = 0; rowIndex < 10; rowIndex++) { + tablet.addTimestamp(rowIndex, timestamp); + tablet.addValue("tag1", rowIndex, "tag:1"); + tablet.addValue("attr1", rowIndex, "attr:" + timestamp); + tablet.addValue("m1", rowIndex, timestamp * 1.0); + timestamp++; + } + session.insert(tablet); + tablet.reset(); + + // insert records to WAL + session.executeNonQueryStatement( + "INSERT INTO remove_attr_col (time, tag1, attr1, m1) VALUES (10, 'tag:1', 'attr:10', 10.0)"); + + // check WAL + for (DataNodeWrapper dataNodeWrapper : EnvFactory.getEnv().getDataNodeWrapperList()) { + String walNodeDir = dataNodeWrapper.getWalDir() + File.separator + "0"; + File[] walFiles = new File(walNodeDir).listFiles(f -> f.getName().endsWith(".wal")); + if (walFiles != null && walFiles.length > 0) { + File walFile = walFiles[0]; + WALEntry entry; + try (WALReader walReader = new WALReader(walFile)) { + entry = walReader.next(); + RelationalInsertTabletNode tabletNode = (RelationalInsertTabletNode) entry.getValue(); + assertTrue( + Arrays.stream(tabletNode.getColumnCategories()) + .noneMatch(c -> c == TsTableColumnCategory.ATTRIBUTE)); + + entry = walReader.next(); + RelationalInsertRowsNode rowsNode = (RelationalInsertRowsNode) entry.getValue(); + assertTrue( + Arrays.stream(rowsNode.getInsertRowNodeList().get(0).getColumnCategories()) + .noneMatch(c -> c == TsTableColumnCategory.ATTRIBUTE)); + return; + } + } + } + } finally { + EnvFactory.getEnv().cleanClusterEnvironment(); + EnvFactory.getEnv().initClusterEnvironment(); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java index aaf5d08f43c..e2eafb53223 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java @@ -70,6 +70,7 @@ public class SchemaValidator { insertStatement.validateTableSchema(metadata, context); insertStatement.updateAfterSchemaValidation(context); insertStatement.validateDeviceSchema(metadata, context); + insertStatement.removeAttributeColumns(); } catch (final QueryProcessException e) { throw new SemanticException(e.getMessage()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java index 43b09decdef..1cb351ec9f5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java @@ -163,24 +163,49 @@ public abstract class WrappedInsertStatement extends WrappedStatement tableSchema = null; } + public static void processNonExistColumn( + ColumnSchema incoming, InsertBaseStatement innerTreeStatement, int i) { + // the column does not exist and auto-creation is disabled + SemanticException semanticException = + new SemanticException( + "Column " + incoming.getName() + " does not exists or fails to be " + "created", + TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode()); + if (incoming.getColumnCategory() != TsTableColumnCategory.FIELD + || !IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) { + // non-measurement columns cannot be partially inserted + throw semanticException; + } else { + // partial insertion + innerTreeStatement.markFailedMeasurement(i, semanticException); + } + } + + public static void processTypeConflictColumn( + ColumnSchema incoming, ColumnSchema real, int i, InsertBaseStatement innerTreeStatement) { + SemanticException semanticException = + new SemanticException( + String.format( + "Incompatible data type of column %s: %s/%s", + incoming.getName(), incoming.getType(), real.getType()), + TSStatusCode.DATA_TYPE_MISMATCH.getStatusCode()); + if (incoming.getColumnCategory() != TsTableColumnCategory.FIELD + || !IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) { + // non-measurement columns cannot be partially inserted + throw semanticException; + } else { + // partial insertion + innerTreeStatement.markFailedMeasurement(i, semanticException); + } + } + public static void validateTableSchema( ColumnSchema incoming, ColumnSchema real, int i, InsertBaseStatement innerTreeStatement) { if (real == null) { - // the column does not exist and auto-creation is disabled - SemanticException semanticException = - new SemanticException( - "Column " + incoming.getName() + " does not exists or fails to be " + "created", - TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode()); - if (incoming.getColumnCategory() != TsTableColumnCategory.FIELD - || !IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) { - // non-measurement columns cannot be partially inserted - throw semanticException; - } else { - // partial insertion - innerTreeStatement.markFailedMeasurement(i, semanticException); - return; - } + processNonExistColumn(incoming, innerTreeStatement, i); + return; } + + // check data type if (incoming.getType() == null || incoming.getColumnCategory() != TsTableColumnCategory.FIELD) { // sql insertion does not provide type // the type is inferred and can be inconsistent with the existing one @@ -188,22 +213,11 @@ public abstract class WrappedInsertStatement extends WrappedStatement } else if (!InternalTypeManager.getTSDataType(real.getType()) .isCompatible(InternalTypeManager.getTSDataType(incoming.getType())) && !innerTreeStatement.isForceTypeConversion()) { - SemanticException semanticException = - new SemanticException( - String.format( - "Incompatible data type of column %s: %s/%s", - incoming.getName(), incoming.getType(), real.getType()), - TSStatusCode.DATA_TYPE_MISMATCH.getStatusCode()); - if (incoming.getColumnCategory() != TsTableColumnCategory.FIELD - || !IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) { - // non-measurement columns cannot be partially inserted - throw semanticException; - } else { - // partial insertion - innerTreeStatement.markFailedMeasurement(i, semanticException); - return; - } + processTypeConflictColumn(incoming, real, i, innerTreeStatement); + return; } + + // check column category if (incoming.getColumnCategory() == null) { // sql insertion does not provide category innerTreeStatement.setColumnCategory(real.getColumnCategory(), i); @@ -214,6 +228,8 @@ public abstract class WrappedInsertStatement extends WrappedStatement incoming.getName(), incoming.getColumnCategory(), real.getColumnCategory()), TSStatusCode.COLUMN_CATEGORY_MISMATCH.getStatusCode()); } + + // construct measurement schema TSDataType tsDataType = InternalTypeManager.getTSDataType(real.getType()); MeasurementSchema measurementSchema = new MeasurementSchema( @@ -244,4 +260,8 @@ public abstract class WrappedInsertStatement extends WrappedStatement public void toLowerCase() { getInnerTreeStatement().toLowerCase(); } + + public void removeAttributeColumns() { + getInnerTreeStatement().removeAttributeColumns(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java index 34b73b678fc..6e0a8bf0b2d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java @@ -51,6 +51,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -368,6 +369,55 @@ public abstract class InsertBaseStatement extends Statement { .collect(Collectors.toList()); } + @TableModel + public void removeAttributeColumns() { + if (columnCategories == null) { + return; + } + + List<Integer> columnsToKeep = new ArrayList<>(); + for (int i = 0; i < columnCategories.length; i++) { + if (!columnCategories[i].equals(TsTableColumnCategory.ATTRIBUTE)) { + columnsToKeep.add(i); + } + } + + if (columnsToKeep.size() == columnCategories.length) { + return; + } + + if (failedMeasurementIndex2Info != null) { + failedMeasurementIndex2Info = + failedMeasurementIndex2Info.entrySet().stream() + .collect(Collectors.toMap(e -> columnsToKeep.indexOf(e.getKey()), Entry::getValue)); + } + + if (measurementSchemas != null) { + measurementSchemas = + columnsToKeep.stream().map(i -> measurementSchemas[i]).toArray(MeasurementSchema[]::new); + } + if (measurements != null) { + measurements = columnsToKeep.stream().map(i -> measurements[i]).toArray(String[]::new); + } + if (dataTypes != null) { + dataTypes = columnsToKeep.stream().map(i -> dataTypes[i]).toArray(TSDataType[]::new); + } + if (columnCategories != null) { + columnCategories = + columnsToKeep.stream() + .map(i -> columnCategories[i]) + .toArray(TsTableColumnCategory[]::new); + } + + subRemoveAttributeColumns(columnsToKeep); + + // to reconstruct indices + idColumnIndices = null; + attrColumnIndices = null; + } + + protected abstract void subRemoveAttributeColumns(List<Integer> columnsToKeep); + public static class FailedMeasurementInfo { protected String measurement; protected TSDataType dataType; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java index e103e6539c4..b4a1db885f8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java @@ -31,6 +31,7 @@ import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.exception.NotImplementedException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -172,4 +173,14 @@ public class InsertMultiTabletsStatement extends InsertBaseStatement { } return database; } + + @Override + public void removeAttributeColumns() { + subRemoveAttributeColumns(Collections.emptyList()); + } + + @Override + protected void subRemoveAttributeColumns(List<Integer> columnsToKeep) { + insertTabletStatementList.forEach(InsertBaseStatement::removeAttributeColumns); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java index d33a83b0c32..ff0ccf310c8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java @@ -526,4 +526,11 @@ public class InsertRowStatement extends InsertBaseStatement implements ISchemaVa super.swapColumn(src, target); CommonUtils.swapArray(values, src, target); } + + @Override + protected void subRemoveAttributeColumns(List<Integer> columnsToKeep) { + if (values != null) { + values = columnsToKeep.stream().map(i -> values[i]).toArray(); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java index 08630f82945..4dab9d423ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java @@ -195,4 +195,9 @@ public class InsertRowsOfOneDeviceStatement extends InsertBaseStatement { } return database; } + + @Override + protected void subRemoveAttributeColumns(List<Integer> columnsToKeep) { + insertRowStatementList.forEach(InsertRowStatement::removeAttributeColumns); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsStatement.java index 2d7ba7a2ba6..f0173885bdf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsStatement.java @@ -35,6 +35,7 @@ import org.apache.tsfile.exception.NotImplementedException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -209,4 +210,14 @@ public class InsertRowsStatement extends InsertBaseStatement { public Statement toRelationalStatement(MPPQueryContext context) { return new InsertRows(this, context); } + + @Override + public void removeAttributeColumns() { + subRemoveAttributeColumns(Collections.emptyList()); + } + + @Override + protected void subRemoveAttributeColumns(List<Integer> columnsToKeep) { + insertRowStatementList.forEach(InsertBaseStatement::removeAttributeColumns); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java index e6f6f5f1a20..84190176791 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java @@ -530,4 +530,14 @@ public class InsertTabletStatement extends InsertBaseStatement implements ISchem } return nullBitMaps[col].isMarked(row); } + + @Override + protected void subRemoveAttributeColumns(List<Integer> columnsToKeep) { + if (columns != null) { + columns = columnsToKeep.stream().map(i -> columns[i]).toArray(); + } + if (nullBitMaps != null) { + nullBitMaps = columnsToKeep.stream().map(i -> nullBitMaps[i]).toArray(BitMap[]::new); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java index 261e95eb52f..0c2e5a18c00 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java @@ -103,7 +103,7 @@ public class WALReader implements Closeable { * @throws NoSuchElementException when not calling hasNext before. */ public WALEntry next() { - if (nextEntry == null) { + if (!hasNext()) { throw new NoSuchElementException(); } WALEntry next = nextEntry;
