This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch fix_table_model_empty_measurements in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 622b849e62154b23347a6032a59f97ad09abf6c8 Author: HTHou <[email protected]> AuthorDate: Wed Nov 27 12:39:54 2024 +0800 Fix table model insert without measurement error --- .../iotdb/session/it/IoTDBSessionRelationalIT.java | 32 ++++++++++++++++++++++ .../dataregion/flush/MemTableFlushTask.java | 2 +- .../dataregion/memtable/AbstractMemTable.java | 7 +++-- .../memtable/AlignedWritableMemChunk.java | 18 ++++++++---- .../memtable/AlignedWritableMemChunkGroup.java | 10 +++---- .../dataregion/memtable/IWritableMemChunk.java | 5 +--- .../dataregion/memtable/WritableMemChunk.java | 2 +- .../iotdb/db/tools/TsFileResourcePrinter.java | 2 +- .../apache/iotdb/db/tools/TsFileSketchTool.java | 3 +- 9 files changed, 59 insertions(+), 22 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java index eb28cf01d70..47ba178dd63 100644 --- a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java @@ -961,4 +961,36 @@ public class IoTDBSessionRelationalIT { } } } + + @Test + @Category({LocalStandaloneIT.class, ClusterIT.class}) + public void insertWithoutMeasurementTest() + throws IoTDBConnectionException, StatementExecutionException { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("USE \"db1\""); + session.executeNonQueryStatement("create table tb (a string id, b string measurement)"); + session.executeNonQueryStatement("insert into tb(a) values ('w')"); + SessionDataSet dataSet = session.executeQueryStatement("select * from tb"); + int cnt = 0; + while (dataSet.hasNext()) { + RowRecord rowRecord = dataSet.next(); + assertEquals("w", rowRecord.getFields().get(1).getBinaryV().toString()); + assertNull(rowRecord.getFields().get(2).getDataType()); + cnt++; + } + assertEquals(1, cnt); + + session.executeNonQueryStatement("flush"); + + dataSet = session.executeQueryStatement("select * from tb"); + cnt = 0; + while (dataSet.hasNext()) { + RowRecord rowRecord = dataSet.next(); + assertEquals("w", rowRecord.getFields().get(1).getBinaryV().toString()); + assertNull(rowRecord.getFields().get(2).getDataType()); + cnt++; + } + assertEquals(1, cnt); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java index 59633610b71..45a398b96bb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java @@ -153,7 +153,7 @@ public class MemTableFlushTask { /* * sort task (first task of flush pipeline) */ - series.sortTvListForFlush(!deviceID.isTableModel()); + series.sortTvListForFlush(); long subTaskTime = System.currentTimeMillis() - startTime; sortTime += subTaskTime; WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.SORT_TASK, subTaskTime); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index 6b0bd5e9f93..b35ab02501f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -190,7 +190,8 @@ public abstract class AbstractMemTable implements IMemTable { seriesNumber += schemaList.size(); totalPointsNumThreshold += ((long) avgSeriesPointNumThreshold) * schemaList.size(); return new AlignedWritableMemChunkGroup( - schemaList.stream().filter(Objects::nonNull).collect(Collectors.toList())); + schemaList.stream().filter(Objects::nonNull).collect(Collectors.toList()), + k.isTableModel()); }); for (IMeasurementSchema schema : schemaList) { if (schema != null && !memChunkGroup.contains(schema.getMeasurementId())) { @@ -943,7 +944,7 @@ public abstract class AbstractMemTable implements IMemTable { boolean isAligned = ReadWriteIOUtils.readBool(stream); IWritableMemChunkGroup memChunkGroup; if (isAligned) { - memChunkGroup = AlignedWritableMemChunkGroup.deserialize(stream); + memChunkGroup = AlignedWritableMemChunkGroup.deserialize(stream, deviceID.isTableModel()); } else { memChunkGroup = WritableMemChunkGroup.deserialize(stream); } @@ -974,7 +975,7 @@ public abstract class AbstractMemTable implements IMemTable { boolean isAligned = ReadWriteIOUtils.readBool(stream); IWritableMemChunkGroup memChunkGroup; if (isAligned) { - memChunkGroup = AlignedWritableMemChunkGroup.deserialize(stream); + memChunkGroup = AlignedWritableMemChunkGroup.deserialize(stream, false); } else { memChunkGroup = WritableMemChunkGroup.deserialize(stream); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index 086e9922c79..8ff67c20096 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -59,7 +59,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { private static final String UNSUPPORTED_TYPE = "Unsupported data type:"; - public AlignedWritableMemChunk(List<IMeasurementSchema> schemaList) { + public AlignedWritableMemChunk(List<IMeasurementSchema> schemaList, boolean isTableModel) { this.measurementIndexMap = new LinkedHashMap<>(); List<TSDataType> dataTypeList = new ArrayList<>(); this.schemaList = schemaList; @@ -68,15 +68,18 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { dataTypeList.add(schemaList.get(i).getType()); } this.list = AlignedTVList.newAlignedList(dataTypeList); + this.ignoreAllNullRows = !isTableModel; } - private AlignedWritableMemChunk(List<IMeasurementSchema> schemaList, AlignedTVList list) { + private AlignedWritableMemChunk( + List<IMeasurementSchema> schemaList, AlignedTVList list, boolean isTableModel) { this.measurementIndexMap = new LinkedHashMap<>(); this.schemaList = schemaList; for (int i = 0; i < schemaList.size(); i++) { measurementIndexMap.put(schemaList.get(i).getMeasurementId(), i); } this.list = list; + this.ignoreAllNullRows = !isTableModel; } public Set<String> getAllMeasurements() { @@ -244,6 +247,9 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { @Override public long count() { + if (!ignoreAllNullRows && measurementIndexMap.isEmpty()) { + return list.rowCount(); + } return (long) list.rowCount() * measurementIndexMap.size(); } @@ -297,8 +303,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { } @Override - public synchronized void sortTvListForFlush(boolean ignoreAllNullRows) { - this.ignoreAllNullRows = ignoreAllNullRows; + public synchronized void sortTvListForFlush() { sortTVList(); } @@ -533,7 +538,8 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { list.serializeToWAL(buffer); } - public static AlignedWritableMemChunk deserialize(DataInputStream stream) throws IOException { + public static AlignedWritableMemChunk deserialize(DataInputStream stream, boolean isTableModel) + throws IOException { int schemaListSize = stream.readInt(); List<IMeasurementSchema> schemaList = new ArrayList<>(schemaListSize); for (int i = 0; i < schemaListSize; i++) { @@ -542,7 +548,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { } AlignedTVList list = (AlignedTVList) TVList.deserialize(stream); - return new AlignedWritableMemChunk(schemaList, list); + return new AlignedWritableMemChunk(schemaList, list, isTableModel); } public List<IMeasurementSchema> getSchemaList() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java index c5da5624a39..4e079b61787 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java @@ -40,8 +40,8 @@ public class AlignedWritableMemChunkGroup implements IWritableMemChunkGroup { private AlignedWritableMemChunk memChunk; - public AlignedWritableMemChunkGroup(List<IMeasurementSchema> schemaList) { - memChunk = new AlignedWritableMemChunk(schemaList); + public AlignedWritableMemChunkGroup(List<IMeasurementSchema> schemaList, boolean isTableModel) { + memChunk = new AlignedWritableMemChunk(schemaList, isTableModel); } private AlignedWritableMemChunkGroup() { @@ -151,10 +151,10 @@ public class AlignedWritableMemChunkGroup implements IWritableMemChunkGroup { memChunk.serializeToWAL(buffer); } - public static AlignedWritableMemChunkGroup deserialize(DataInputStream stream) - throws IOException { + public static AlignedWritableMemChunkGroup deserialize( + DataInputStream stream, boolean isTableModel) throws IOException { AlignedWritableMemChunkGroup memChunkGroup = new AlignedWritableMemChunkGroup(); - memChunkGroup.memChunk = AlignedWritableMemChunk.deserialize(stream); + memChunkGroup.memChunk = AlignedWritableMemChunk.deserialize(stream, isTableModel); return memChunkGroup; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java index 38ba0b01723..2d631e80209 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java @@ -122,11 +122,8 @@ public interface IWritableMemChunk extends WALEntryValue { * reference count * * <p>This interface should be synchronized for concurrent with getSortedTvListForQuery - * - * @param ignoreAllNullRows whether to ignore all null rows, true for tree model, false for table - * model */ - void sortTvListForFlush(boolean ignoreAllNullRows); + void sortTvListForFlush(); default TVList getTVList() { return null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java index 97c95e7f13a..4aad11dee53 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java @@ -240,7 +240,7 @@ public class WritableMemChunk implements IWritableMemChunk { } @Override - public synchronized void sortTvListForFlush(boolean ignoreAllNullRows) { + public synchronized void sortTvListForFlush() { sortTVList(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileResourcePrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileResourcePrinter.java index d3e5597ee3e..88e7d8c5ed3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileResourcePrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileResourcePrinter.java @@ -40,7 +40,7 @@ public class TsFileResourcePrinter { @SuppressWarnings("squid:S106") public static void main(String[] args) throws IOException { - String folder = "data/data/sequence/root.group_1/0"; + String folder = "/Users/ht/Downloads/1732674107542-1-0-0.tsfile.resource"; if (args.length >= 1) { folder = args[0]; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java index 7c2ce7bb5eb..fb9c89cdea8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java @@ -65,7 +65,8 @@ public class TsFileSketchTool { public static void main(String[] args) throws IOException { Pair<String, String> fileNames = checkArgs(args); - String filename = fileNames.left; + String filename = + "/Users/ht/Documents/iotdb/data/datanode/data/sequence/root.db/1/2864/1732677854708-1-0-0.tsfile"; String outFile = fileNames.right; System.out.println("TsFile path:" + filename); System.out.println("Sketch save path:" + outFile);
