This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion by this
push:
new da82fb67d51 Finish write MemTable
da82fb67d51 is described below
commit da82fb67d51d41f526295f2161b99173474fc5dd
Author: jt2594838 <[email protected]>
AuthorDate: Fri Jun 21 15:07:20 2024 +0800
Finish write MemTable
---
.../dataregion/DataExecutionVisitor.java | 35 +----------
.../plan/analyze/schema/SchemaValidator.java | 26 +--------
.../plan/planner/plan/node/write/InsertNode.java | 67 ++++++++++++++++++----
.../planner/plan/node/write/InsertTabletNode.java | 66 +++++++++++++++++++++
.../node/write/RelationalInsertTabletNode.java | 12 ++++
.../plan/relational/metadata/ColumnSchema.java | 8 +++
.../plan/relational/metadata/TableSchema.java | 30 ++++++++++
.../relational/sql/ast/WrappedInsertStatement.java | 30 ++++++++++
.../db/storageengine/dataregion/DataRegion.java | 62 ++++++++------------
.../dataregion/memtable/AbstractMemTable.java | 44 ++++++++------
.../memtable/AlignedWritableMemChunk.java | 9 +--
.../memtable/AlignedWritableMemChunkGroup.java | 5 +-
.../dataregion/memtable/IWritableMemChunk.java | 5 +-
.../memtable/IWritableMemChunkGroup.java | 3 +-
.../dataregion/memtable/TsFileProcessor.java | 8 +++
.../dataregion/memtable/WritableMemChunk.java | 5 +-
.../dataregion/memtable/WritableMemChunkGroup.java | 3 +-
.../org/apache/iotdb/db/utils/CommonUtils.java | 10 ++++
.../java/org/apache/iotdb/db/utils/MemUtils.java | 13 ++++-
.../db/utils/datastructure/AlignedTVList.java | 12 ++--
.../iotdb/db/utils/datastructure/TVList.java | 4 +-
.../storageengine/dataregion/DataRegionTest.java | 28 ++++-----
.../dataregion/memtable/TsFileProcessorTest.java | 28 ++++-----
.../org/apache/iotdb/db/utils/MemUtilsTest.java | 2 +-
.../db/utils/datastructure/AlignedTVListTest.java | 10 ++--
.../apache/iotdb/commons/schema/table/TsTable.java | 1 +
.../schema/table/column/TsTableColumnCategory.java | 14 +++++
27 files changed, 364 insertions(+), 176 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index a46df8d1b5a..4fbccc10049 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -76,44 +76,13 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
@Override
public TSStatus visitRelationalInsertTablet(RelationalInsertTabletNode node,
DataRegion dataRegion) {
- try {
- dataRegion.insertRelationalTablet(node);
- return StatusUtils.OK;
- } catch (OutOfTTLException e) {
- LOGGER.warn("Error in executing plan node: {}, caused by {}", node,
e.getMessage());
- return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
- } catch (WriteProcessRejectException e) {
- LOGGER.warn("Reject in executing plan node: {}, caused by {}", node,
e.getMessage());
- return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
- } catch (WriteProcessException e) {
- LOGGER.error("Error in executing plan node: {}", node, e);
- return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
- } catch (BatchProcessException e) {
- LOGGER.warn(
- "Batch failure in executing a InsertTabletNode. device: {},
startTime: {}, measurements: {}, failing status: {}",
- node.getDevicePath(),
- node.getTimes()[0],
- node.getMeasurements(),
- e.getFailingStatus());
- // For each error
- TSStatus firstStatus = null;
- for (TSStatus status : e.getFailingStatus()) {
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- firstStatus = status;
- }
- // Return WRITE_PROCESS_REJECT directly for the consensus retry logic
- if (status.getCode() ==
TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
- return status;
- }
- }
- return firstStatus;
- }
+ return visitInsertTablet(node, dataRegion);
}
@Override
public TSStatus visitInsertTablet(InsertTabletNode node, DataRegion
dataRegion) {
try {
- dataRegion.insertTreeTablet(node);
+ dataRegion.insertTablet(node);
return StatusUtils.OK;
} catch (OutOfTTLException e) {
LOGGER.warn("Error in executing plan node: {}, caused by {}", node,
e.getMessage());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
index 71dc2afbe4b..be74ef5be14 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
@@ -68,7 +68,7 @@ public class SchemaValidator {
final TableSchema incomingSchema = insertStatement.getTableSchema();
final TableSchema realSchema =
metadata.validateTableHeaderSchema(databaseName,
incomingSchema, context);
- validate(incomingSchema, realSchema);
+ insertStatement.validate(realSchema);
metadata.validateDeviceSchema(insertStatement, context);
insertStatement.updateAfterSchemaValidation(context);
} catch (QueryProcessException e) {
@@ -76,31 +76,7 @@ public class SchemaValidator {
}
}
- public static void validate(TableSchema incomingSchema, TableSchema
realSchema) {
- final List<ColumnSchema> incomingSchemaColumns =
incomingSchema.getColumns();
- Map<String, ColumnSchema> realSchemaMap = new HashMap<>();
- realSchema.getColumns().forEach(c -> realSchemaMap.put(c.getName(), c));
- for (ColumnSchema incomingSchemaColumn : incomingSchemaColumns) {
- final ColumnSchema realSchemaColumn =
realSchemaMap.get(incomingSchemaColumn.getName());
- validate(incomingSchemaColumn, realSchemaColumn);
- }
- }
-
- public static void validate(ColumnSchema incoming, ColumnSchema real) {
- if (real == null) {
- throw new SemanticException("Column " + incoming.getName() + " does not
exists or fails to be "
- + "created");
- }
- if (!incoming.getType().equals(real.getType())) {
- throw new SemanticException(String.format("Inconsistent data type of
column %s: %s/%s",
- incoming.getName(), incoming.getType(), real.getType()));
- }
- if (!incoming.getColumnCategory().equals(real.getColumnCategory())) {
- throw new SemanticException(String.format("Inconsistent column category
of column %s: %s/%s",
- incoming.getName(), incoming.getColumnCategory(),
real.getColumnCategory()));
- }
- }
public static ISchemaTree validate(
ISchemaFetcher schemaFetcher,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index f81150db2a8..032020ef303 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
import java.util.ArrayList;
import java.util.List;
+import java.util.function.Function;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
@@ -31,12 +32,14 @@ import
org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.NotImplementedException;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import java.io.DataInputStream;
@@ -48,12 +51,14 @@ import java.util.Objects;
public abstract class InsertNode extends WritePlanNode implements
ComparableConsensusRequest {
- /** this insert node doesn't need to participate in iot consensus */
+ /**
+ * this insert node doesn't need to participate in iot consensus
+ */
public static final long NO_CONSENSUS_INDEX =
ConsensusReqReader.DEFAULT_SEARCH_INDEX;
/**
- * if use id table, this filed is id form of device path <br>
- * if not, this filed is device path<br>
+ * if use id table, this filed is id form of device path <br> if not, this
filed is device
+ * path<br>
*/
protected PartialPath devicePath;
@@ -64,12 +69,12 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
protected TsTableColumnCategory[] columnCategories;
protected List<Integer> idColumnIndices;
+ protected int measurementColumnCnt = -1;
protected int failedMeasurementNumber = 0;
/**
- * device id reference, for reuse device id in both id table and memtable
<br>
- * used in memtable
+ * device id reference, for reuse device id in both id table and memtable
<br> used in memtable
*/
protected IDeviceID deviceID;
@@ -79,7 +84,9 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
*/
protected long searchIndex = NO_CONSENSUS_INDEX;
- /** Physical address of data region after splitting */
+ /**
+ * Physical address of data region after splitting
+ */
protected TRegionReplicaSet dataRegionReplicaSet;
protected ProgressIndex progressIndex;
@@ -150,6 +157,11 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
return measurements;
}
+ public boolean isValidMeasurement(int i) {
+ return measurementSchemas != null && measurementSchemas[i] != null &&
(columnCategories == null
+ || columnCategories[i] == TsTableColumnCategory.MEASUREMENT);
+ }
+
public void setMeasurements(String[] measurements) {
this.measurements = measurements;
}
@@ -158,6 +170,20 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
return dataTypes;
}
+ public int getMeasurementColumnCnt() {
+ if (measurementColumnCnt == -1) {
+ measurementColumnCnt = 0;
+ if (measurementSchemas != null) {
+ for (int i = 0; i < measurementSchemas.length; i++) {
+ if (isValidMeasurement(i)) {
+ measurementColumnCnt++;
+ }
+ }
+ }
+ }
+ return measurementColumnCnt;
+ }
+
public TSDataType getDataType(int index) {
return dataTypes[index];
}
@@ -181,7 +207,9 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
return searchIndex;
}
- /** Search index should start from 1 */
+ /**
+ * Search index should start from 1
+ */
public void setSearchIndex(long searchIndex) {
this.searchIndex = searchIndex;
}
@@ -197,7 +225,10 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
}
// region Serialization methods for WAL
- /** Serialized size of measurement schemas, ignoring failed time series */
+
+ /**
+ * Serialized size of measurement schemas, ignoring failed time series
+ */
protected int serializeMeasurementSchemasSize() {
int byteLen = 0;
for (int i = 0; i < measurements.length; i++) {
@@ -210,7 +241,9 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
return byteLen;
}
- /** Serialize measurement schemas, ignoring failed time series */
+ /**
+ * Serialize measurement schemas, ignoring failed time series
+ */
protected void serializeMeasurementSchemasToWAL(IWALByteBufferView buffer) {
for (int i = 0; i < measurements.length; i++) {
// ignore failed partial insert
@@ -305,9 +338,15 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
InsertNode that = (InsertNode) o;
return isAligned == that.isAligned
&& Objects.equals(devicePath, that.devicePath)
@@ -343,4 +382,8 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
}
}
}
+
+ public String getTableName() {
+ return null;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index e46e6cc91e1..d0e733b69e1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -19,14 +19,20 @@
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+import static org.apache.iotdb.db.utils.CommonUtils.isAlive;
+
import java.util.Map.Entry;
import java.util.function.IntFunction;
+import java.util.function.IntToLongFunction;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -37,8 +43,11 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
+import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.NotImplementedException;
import org.apache.tsfile.file.metadata.IDeviceID;
@@ -1183,4 +1192,61 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
public List<Pair<IDeviceID, Integer>> splitByDevice(int start, int end) {
return Collections.singletonList(new Pair<>(deviceID, end));
}
+
+
+ /**
+ *
+ * @param results insertion result of each row
+ * @param rowTTLGetter the ttl associated with each row
+ * @return the position of the first alive row
+ * @throws OutOfTTLException if all rows have expired the TTL
+ */
+ public int checkTTL(TSStatus[] results,
+ IntToLongFunction rowTTLGetter)
+ throws OutOfTTLException {
+ return checkTTLInternal(results, rowTTLGetter, true);
+ }
+
+ protected int checkTTLInternal(TSStatus[] results,
+ IntToLongFunction rowTTLGetter, boolean breakOnFirstAlive)
+ throws OutOfTTLException {
+
+ /*
+ * assume that batch has been sorted by client
+ */
+ int loc = 0;
+ long ttl = 0;
+ int firstAliveLoc = -1;
+ while (loc < getRowCount()) {
+ ttl = rowTTLGetter.applyAsLong(loc);
+ long currTime = getTimes()[loc];
+ // skip points that do not satisfy TTL
+ if (!isAlive(currTime, ttl)) {
+ results[loc] =
+ RpcUtils.getStatus(
+ TSStatusCode.OUT_OF_TTL,
+ String.format(
+ "Insertion time [%s] is less than ttl time bound [%s]",
+ DateTimeUtils.convertLongToDate(currTime),
+ DateTimeUtils.convertLongToDate(
+ CommonDateTimeUtils.currentTime() - ttl)));
+ } else {
+ if (firstAliveLoc == -1) {
+ firstAliveLoc = loc;
+ }
+ if (breakOnFirstAlive) {
+ break;
+ }
+ }
+ loc++;
+ }
+
+ if (firstAliveLoc == -1) {
+ // no alive data
+ throw new OutOfTTLException(
+ getTimes()[getTimes().length - 1],
+ (CommonDateTimeUtils.currentTime() - ttl));
+ }
+ return firstAliveLoc;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index c60499e97f3..f86e3302d87 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -24,8 +24,11 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.function.IntToLongFunction;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -157,5 +160,14 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
return result;
}
+
+ @Override
+ public int checkTTL(TSStatus[] results, IntToLongFunction rowTTLGetter)
throws OutOfTTLException {
+ return checkTTLInternal(results, rowTTLGetter, false);
+ }
+
+ public String getTableName() {
+ return deviceID.getTableName();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
index bb460c61423..91f074811d7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
@@ -21,12 +21,14 @@ package
org.apache.iotdb.db.queryengine.plan.relational.metadata;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
import org.apache.tsfile.read.common.type.BinaryType;
import org.apache.tsfile.read.common.type.BooleanType;
import org.apache.tsfile.read.common.type.DoubleType;
import org.apache.tsfile.read.common.type.FloatType;
import org.apache.tsfile.read.common.type.Type;
import org.apache.tsfile.read.common.type.TypeEnum;
+import org.apache.tsfile.read.common.type.TypeFactory;
import org.apache.tsfile.read.common.type.UnknownType;
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -35,6 +37,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.StringJoiner;
+import org.apache.tsfile.write.record.Tablet.ColumnType;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
@@ -143,6 +146,11 @@ public class ColumnSchema {
}
}
+ public static ColumnSchema ofTsColumnSchema(TsTableColumnSchema schema) {
+ return new ColumnSchema(schema.getColumnName(),
TypeFactory.getType(schema.getDataType()),
+ false, schema.getColumnCategory());
+ }
+
public static Builder builder() {
return new Builder();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
index e3ed509931c..4b88ef4807f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
@@ -19,7 +19,14 @@
package org.apache.iotdb.db.queryengine.plan.relational.metadata;
+import java.util.ArrayList;
import java.util.List;
+import org.apache.iotdb.commons.schema.table.TsTable;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
+import
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
+import org.apache.tsfile.write.record.Tablet.ColumnType;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
public class TableSchema {
@@ -39,4 +46,27 @@ public class TableSchema {
public List<ColumnSchema> getColumns() {
return columns;
}
+
+ public static TableSchema of(TsTable tsTable) {
+ String tableName = tsTable.getTableName();
+ List<ColumnSchema> columns = new ArrayList<>();
+ for (TsTableColumnSchema tsTableColumnSchema : tsTable.getColumnList()) {
+ columns.add(ColumnSchema.ofTsColumnSchema(tsTableColumnSchema));
+ }
+ return new TableSchema(tableName, columns);
+ }
+
+ public org.apache.tsfile.file.metadata.TableSchema toTsFileTableSchema() {
+ // TODO-Table: unify redundant definitions
+ String tableName = this.getTableName();
+ List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
+ List<ColumnType> columnTypes = new ArrayList<>();
+ for (ColumnSchema column : columns) {
+ measurementSchemas.add(new MeasurementSchema(column.getName(),
+ InternalTypeManager.getTSDataType(column.getType())));
+ columnTypes.add(column.getColumnCategory().toTsFileColumnType());
+ }
+ return
+ new org.apache.tsfile.file.metadata.TableSchema(tableName,
measurementSchemas, columnTypes);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
index f81a26dc594..355786f86af 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
@@ -19,8 +19,12 @@
package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.ITableDeviceSchemaValidation;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
@@ -67,4 +71,30 @@ public abstract class WrappedInsertStatement extends
WrappedStatement implements
return tableSchema;
}
+
+ public void validate(TableSchema realSchema) throws QueryProcessException {
+ final List<ColumnSchema> incomingSchemaColumns =
getTableSchema().getColumns();
+ Map<String, ColumnSchema> realSchemaMap = new HashMap<>();
+ realSchema.getColumns().forEach(c -> realSchemaMap.put(c.getName(), c));
+
+ for (ColumnSchema incomingSchemaColumn : incomingSchemaColumns) {
+ final ColumnSchema realSchemaColumn =
realSchemaMap.get(incomingSchemaColumn.getName());
+ validate(incomingSchemaColumn, realSchemaColumn);
+ }
+ }
+
+ public static void validate(ColumnSchema incoming, ColumnSchema real) {
+ if (real == null) {
+ throw new SemanticException("Column " + incoming.getName() + " does not
exists or fails to be "
+ + "created");
+ }
+ if (!incoming.getType().equals(real.getType())) {
+ throw new SemanticException(String.format("Inconsistent data type of
column %s: %s/%s",
+ incoming.getName(), incoming.getType(), real.getType()));
+ }
+ if (!incoming.getColumnCategory().equals(real.getColumnCategory())) {
+ throw new SemanticException(String.format("Inconsistent column category
of column %s: %s/%s",
+ incoming.getName(), incoming.getColumnCategory(),
real.getColumnCategory()));
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 7bf4862ac12..bdd0922a759 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.storageengine.dataregion;
-import java.util.function.IntFunction;
import java.util.function.IntToLongFunction;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
@@ -58,10 +57,13 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCach
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
+import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
import org.apache.iotdb.db.service.SettleService;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.service.metrics.FileMetrics;
@@ -113,6 +115,7 @@ import
org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionManager;
import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
import
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager;
import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool;
+import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -932,7 +935,7 @@ public class DataRegion implements IDataRegionForQuery {
// reject insertions that are out of ttl
long deviceTTL =
DataNodeTTLCache.getInstance().getTTL(insertRowNode.getDevicePath().getNodes());
- if (!isAlive(insertRowNode.getTime(), deviceTTL)) {
+ if (!CommonUtils.isAlive(insertRowNode.getTime(), deviceTTL)) {
throw new OutOfTTLException(
insertRowNode.getTime(), (CommonDateTimeUtils.currentTime() -
deviceTTL));
}
@@ -966,21 +969,6 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /**
- * Insert a tablet (rows belonging to the same devices) into this database.
- *
- * @throws BatchProcessException if some of the rows failed to be inserted
- */
- @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive
Complexity warning
- public void insertTreeTablet(InsertTabletNode insertTabletNode)
- throws BatchProcessException, WriteProcessException {
- insertTablet(insertTabletNode, false);
- }
-
- public void insertRelationalTablet(InsertTabletNode insertTabletNode)
- throws BatchProcessException, WriteProcessException {
- insertTablet(insertTabletNode, true);
- }
private long getLastFlushTime(long timePartitionID, IDeviceID deviceID) {
return config.isEnableSeparateData()
@@ -1056,8 +1044,7 @@ public class DataRegion implements IDataRegionForQuery {
* @throws BatchProcessException if some of the rows failed to be inserted
*/
@SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive
Complexity warning
- private void insertTablet(InsertTabletNode insertTabletNode,
- boolean checkAllRowTtl)
+ public void insertTablet(InsertTabletNode insertTabletNode)
throws BatchProcessException, WriteProcessException {
StorageEngine.blockInsertionIfReject(null);
long startTime = System.nanoTime();
@@ -1070,12 +1057,11 @@ public class DataRegion implements IDataRegionForQuery {
TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
boolean noFailure;
+ int loc = insertTabletNode.checkTTL(results, i ->
DataNodeTTLCache.getInstance()
+ .getTTL(insertTabletNode.getDeviceID(i)));
+ noFailure = loc == 0;
- int loc = checkTTL(insertTabletNode, results, i ->
DataNodeTTLCache.getInstance()
- .getTTL(insertTabletNode.getDeviceID(i)), !checkAllRowTtl);
- noFailure = loc != 0;
-
- noFailure = noFailure & splitAndInsert(insertTabletNode, loc, results);
+ noFailure = noFailure && splitAndInsert(insertTabletNode, loc, results);
startTime = System.nanoTime();
tryToUpdateInsertTabletLastCache(insertTabletNode);
@@ -1117,7 +1103,7 @@ public class DataRegion implements IDataRegionForQuery {
ttl = rowTTLGetter.applyAsLong(loc);
long currTime = insertTabletNode.getTimes()[loc];
// skip points that do not satisfy TTL
- if (!isAlive(currTime, ttl)) {
+ if (!CommonUtils.isAlive(currTime, ttl)) {
results[loc] =
RpcUtils.getStatus(
TSStatusCode.OUT_OF_TTL,
@@ -1147,15 +1133,6 @@ public class DataRegion implements IDataRegionForQuery {
}
- /**
- * Check whether the time falls in TTL.
- *
- * @return whether the given time falls in ttl
- */
- private boolean isAlive(long time, long dataTTL) {
- return dataTTL == Long.MAX_VALUE || (CommonDateTimeUtils.currentTime() -
time) <= dataTTL;
- }
-
/**
* insert batch to tsfile processor thread-safety that the caller need to
guarantee The rows to be
* inserted are in the range [start, end) Null value in each column values
will be replaced by the
@@ -1193,6 +1170,9 @@ public class DataRegion implements IDataRegionForQuery {
return false;
}
+ // register TableSchema (and maybe more) for table insertion
+ registerToTsFile(insertTabletNode, tsFileProcessor);
+
try {
tsFileProcessor.insertTablet(insertTabletNode, start, end, results,
noFailure);
} catch (WriteProcessRejectException e) {
@@ -1210,6 +1190,14 @@ public class DataRegion implements IDataRegionForQuery {
return true;
}
+ private void registerToTsFile(InsertNode node, TsFileProcessor
tsFileProcessor) {
+ final String tableName = node.getTableName();
+ if (tableName != null) {
+ tsFileProcessor.registerToTsFile(tableName,
+ t ->
TableSchema.of(DataNodeTableCache.getInstance().getTable(getDatabaseName(),
t)).toTsFileTableSchema());
+ }
+ }
+
private void tryToUpdateInsertTabletLastCache(InsertTabletNode node) {
if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
||
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
@@ -3373,7 +3361,7 @@ public class DataRegion implements IDataRegionForQuery {
Map<TsFileProcessor, InsertRowsNode> tsFileProcessorMap = new
HashMap<>();
for (int i = 0; i <
insertRowsOfOneDeviceNode.getInsertRowNodeList().size(); i++) {
InsertRowNode insertRowNode =
insertRowsOfOneDeviceNode.getInsertRowNodeList().get(i);
- if (!isAlive(insertRowNode.getTime(), deviceTTL)) {
+ if (!CommonUtils.isAlive(insertRowNode.getTime(), deviceTTL)) {
// we do not need to write these part of data, as they can not be
queried
// or the sub-plan has already been executed, we are retrying other
sub-plans
insertRowsOfOneDeviceNode
@@ -3485,7 +3473,7 @@ public class DataRegion implements IDataRegionForQuery {
InsertRowNode insertRowNode =
insertRowsNode.getInsertRowNodeList().get(i);
long deviceTTL =
DataNodeTTLCache.getInstance().getTTL(insertRowNode.getDevicePath().getNodes());
- if (!isAlive(insertRowNode.getTime(), deviceTTL)) {
+ if (!CommonUtils.isAlive(insertRowNode.getTime(), deviceTTL)) {
insertRowsNode
.getResults()
.put(
@@ -3538,7 +3526,7 @@ public class DataRegion implements IDataRegionForQuery {
for (int i = 0; i <
insertMultiTabletsNode.getInsertTabletNodeList().size(); i++) {
InsertTabletNode insertTabletNode =
insertMultiTabletsNode.getInsertTabletNodeList().get(i);
try {
- insertTreeTablet(insertTabletNode);
+ insertTablet(insertTabletNode);
} catch (WriteProcessException e) {
insertMultiTabletsNode
.getResults()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 263edad395b..f17b9fc0ec6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.path.AlignedFullPath;
import org.apache.iotdb.commons.path.IFullPath;
import org.apache.iotdb.commons.path.NonAlignedFullPath;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -354,10 +355,11 @@ public abstract class AbstractMemTable implements
IMemTable {
TSStatus[] results)
throws WriteProcessException {
try {
- writeAlignedTablet(insertTabletNode, start, end);
- memSize += MemUtils.getAlignedTabletSize(insertTabletNode, start, end);
+ writeAlignedTablet(insertTabletNode, start, end, results);
+ //TODO-Table: what is the relation between this and
TsFileProcessor.checkMemCost
+ memSize += MemUtils.getAlignedTabletSize(insertTabletNode, start, end,
results);
int pointsInserted =
- (insertTabletNode.getDataTypes().length -
insertTabletNode.getFailedMeasurementNumber())
+ (insertTabletNode.getMeasurementColumnCnt() -
insertTabletNode.getFailedMeasurementNumber())
* (end - start);
totalPointsNum += pointsInserted;
MetricService.getInstance()
@@ -432,16 +434,18 @@ public abstract class AbstractMemTable implements
IMemTable {
insertTabletNode.getBitMaps(),
schemaList,
start,
- end)) {
+ end, null)) {
shouldFlush = true;
}
}
- public void writeAlignedTablet(InsertTabletNode insertTabletNode, int start,
int end) {
+ public void writeAlignedTablet(InsertTabletNode insertTabletNode, int start,
int end,
+ TSStatus[] results) {
List<IMeasurementSchema> schemaList = new ArrayList<>();
for (int i = 0; i < insertTabletNode.getMeasurementSchemas().length; i++) {
- if (insertTabletNode.getColumns()[i] == null) {
+ if (insertTabletNode.getColumns()[i] == null ||
+ (insertTabletNode.getColumnCategories() != null &&
insertTabletNode.getColumnCategories()[i] !=
TsTableColumnCategory.MEASUREMENT)) {
schemaList.add(null);
} else {
schemaList.add(insertTabletNode.getMeasurementSchemas()[i]);
@@ -450,16 +454,24 @@ public abstract class AbstractMemTable implements
IMemTable {
if (schemaList.isEmpty()) {
return;
}
- IWritableMemChunkGroup memChunkGroup =
-
createAlignedMemChunkGroupIfNotExistAndGet(insertTabletNode.getDeviceID(),
schemaList);
- if (memChunkGroup.writeValuesWithFlushCheck(
- insertTabletNode.getTimes(),
- insertTabletNode.getColumns(),
- insertTabletNode.getBitMaps(),
- schemaList,
- start,
- end)) {
- shouldFlush = true;
+ final List<Pair<IDeviceID, Integer>> deviceEndOffsetPair =
insertTabletNode.splitByDevice(start, end);
+ int splitStart = start;
+ for (Pair<IDeviceID, Integer> pair : deviceEndOffsetPair) {
+ final IDeviceID deviceID = pair.left;
+ int splitEnd = pair.right;
+ IWritableMemChunkGroup memChunkGroup =
+ createAlignedMemChunkGroupIfNotExistAndGet(deviceID, schemaList);
+ if (memChunkGroup.writeValuesWithFlushCheck(
+ insertTabletNode.getTimes(),
+ insertTabletNode.getColumns(),
+ insertTabletNode.getBitMaps(),
+ schemaList,
+ splitStart,
+ splitEnd,
+ results)) {
+ shouldFlush = true;
+ }
+ splitStart = splitEnd;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index a219df17908..f904b9de49a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.memtable;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
@@ -156,8 +157,8 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
@Override
public boolean putAlignedValuesWithFlushCheck(
- long[] t, Object[] v, BitMap[] bitMaps, int start, int end) {
- list.putAlignedValues(t, v, bitMaps, start, end);
+ long[] t, Object[] v, BitMap[] bitMaps, int start, int end, TSStatus[]
results) {
+ list.putAlignedValues(t, v, bitMaps, start, end, results);
return list.reachMaxChunkSizeThreshold();
}
@@ -187,13 +188,13 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
BitMap[] bitMaps,
List<IMeasurementSchema> schemaList,
int start,
- int end) {
+ int end, TSStatus[] results) {
Pair<Object[], BitMap[]> pair =
checkAndReorderColumnValuesInInsertPlan(schemaList, valueList,
bitMaps);
Object[] reorderedColumnValues = pair.left;
BitMap[] reorderedBitMaps = pair.right;
return putAlignedValuesWithFlushCheck(
- times, reorderedColumnValues, reorderedBitMaps, start, end);
+ times, reorderedColumnValues, reorderedBitMaps, start, end, results);
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
index c541e52e817..797880b186f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.memtable;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternUtil;
@@ -55,9 +56,9 @@ public class AlignedWritableMemChunkGroup implements
IWritableMemChunkGroup {
BitMap[] bitMaps,
List<IMeasurementSchema> schemaList,
int start,
- int end) {
+ int end, TSStatus[] results) {
return memChunk.writeAlignedValuesWithFlushCheck(
- times, columns, bitMaps, schemaList, start, end);
+ times, columns, bitMaps, schemaList, start, end, results);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index eac06484035..376444072da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.memtable;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
import org.apache.iotdb.db.utils.datastructure.TVList;
@@ -58,7 +59,7 @@ public interface IWritableMemChunk extends WALEntryValue {
void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end);
boolean putAlignedValuesWithFlushCheck(
- long[] t, Object[] v, BitMap[] bitMaps, int start, int end);
+ long[] t, Object[] v, BitMap[] bitMaps, int start, int end, TSStatus[]
results);
boolean writeWithFlushCheck(long insertTime, Object objectValue);
@@ -78,7 +79,7 @@ public interface IWritableMemChunk extends WALEntryValue {
BitMap[] bitMaps,
List<IMeasurementSchema> schemaList,
int start,
- int end);
+ int end, TSStatus[] results);
long count();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
index 3f94f361a35..29ab0f4afbb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.memtable;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
@@ -36,7 +37,7 @@ public interface IWritableMemChunkGroup extends WALEntryValue
{
BitMap[] bitMaps,
List<IMeasurementSchema> schemaList,
int start,
- int end);
+ int end, TSStatus[] results);
void release();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 4cb4210d222..bece21045f4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.memtable;
+import java.util.function.Function;
import java.util.function.IntFunction;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -88,6 +89,7 @@ import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -2201,4 +2203,10 @@ public class TsFileProcessor {
public ConcurrentLinkedDeque<IMemTable> getFlushingMemTable() {
return flushingMemTables;
}
+
+ public void registerToTsFile(String tableName,
+ Function<String, TableSchema> tableSchemaFunction) {
+ getWriter().getKnownSchema().getTableSchemaMap().computeIfAbsent(tableName,
+ tableSchemaFunction);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 4537809f5af..36282e91840 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.memtable;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
@@ -137,7 +138,7 @@ public class WritableMemChunk implements IWritableMemChunk {
BitMap[] bitMaps,
List<IMeasurementSchema> schemaList,
int start,
- int end) {
+ int end, TSStatus[] results) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
list.getDataType());
}
@@ -211,7 +212,7 @@ public class WritableMemChunk implements IWritableMemChunk {
@Override
public boolean putAlignedValuesWithFlushCheck(
- long[] t, Object[] v, BitMap[] bitMaps, int start, int end) {
+ long[] t, Object[] v, BitMap[] bitMaps, int start, int end, TSStatus[]
results) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
schema.getType());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
index 5d61fe0e3dd..867b2aa11dc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.memtable;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternUtil;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
@@ -51,7 +52,7 @@ public class WritableMemChunkGroup implements
IWritableMemChunkGroup {
BitMap[] bitMaps,
List<IMeasurementSchema> schemaList,
int start,
- int end) {
+ int end, TSStatus[] results) {
boolean flushFlag = false;
for (int i = 0; i < columns.length; i++) {
if (columns[i] == null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index 6a179cbaccb..e5c40c1b8c6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.utils;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
@@ -414,4 +415,13 @@ public class CommonUtils {
}
return ret;
}
+
+ /**
+ * Check whether the time falls in TTL.
+ *
+ * @return whether the given time falls in ttl
+ */
+ public static boolean isAlive(long time, long dataTTL) {
+ return dataTTL == Long.MAX_VALUE || (CommonDateTimeUtils.currentTime() -
time) <= dataTTL;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index 76953fe9fa2..7d179e7defc 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -127,16 +127,23 @@ public class MemUtils {
return memSize;
}
- public static long getAlignedTabletSize(InsertTabletNode insertTabletNode,
int start, int end) {
+ public static long getAlignedTabletSize(InsertTabletNode insertTabletNode,
int start, int end,
+ TSStatus[] results) {
if (start >= end) {
return 0L;
}
long memSize = 0;
for (int i = 0; i < insertTabletNode.getMeasurements().length; i++) {
- if (insertTabletNode.getMeasurements()[i] == null) {
+ if (!insertTabletNode.isValidMeasurement(i)) {
continue;
}
- memSize += (long) (end - start) *
insertTabletNode.getDataTypes()[i].getDataTypeSize();
+ if (results == null) {
+ memSize += (long) (end - start) *
insertTabletNode.getDataTypes()[i].getDataTypeSize();
+ } else {
+ for (int j = start; j < end; j++) {
+ memSize += insertTabletNode.getDataTypes()[i].getDataTypeSize();
+ }
+ }
}
// time and index column memSize for vector
memSize += (end - start) * (8L + 4L);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 8372468e3cd..9990294b7bd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.utils.datastructure;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
@@ -723,7 +724,8 @@ public abstract class AlignedTVList extends TVList {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
@Override
- public void putAlignedValues(long[] time, Object[] value, BitMap[] bitMaps,
int start, int end) {
+ public void putAlignedValues(long[] time, Object[] value, BitMap[] bitMaps,
int start, int end,
+ TSStatus[] results) {
checkExpansion();
int idx = start;
@@ -742,7 +744,8 @@ public abstract class AlignedTVList extends TVList {
indices.get(arrayIdx)[elementIdx + i] = rowCount;
for (int j = 0; j < values.size(); j++) {
if (value[j] == null
- || bitMaps != null && bitMaps[j] != null &&
bitMaps[j].isMarked(idx + i)) {
+ || bitMaps != null && bitMaps[j] != null &&
bitMaps[j].isMarked(idx + i)
+ || results != null && results[idx + i] != null) {
markNullValue(j, arrayIdx, elementIdx + i);
}
}
@@ -758,7 +761,8 @@ public abstract class AlignedTVList extends TVList {
indices.get(arrayIdx)[elementIdx + i] = rowCount;
for (int j = 0; j < values.size(); j++) {
if (value[j] == null
- || bitMaps != null && bitMaps[j] != null &&
bitMaps[j].isMarked(idx + i)) {
+ || bitMaps != null && bitMaps[j] != null &&
bitMaps[j].isMarked(idx + i)
+ || results != null && results[idx + i] != null) {
markNullValue(j, arrayIdx, elementIdx + i);
}
}
@@ -1259,7 +1263,7 @@ public abstract class AlignedTVList extends TVList {
}
AlignedTVList tvList = AlignedTVList.newAlignedList(dataTypes);
- tvList.putAlignedValues(times, values, bitMaps, 0, rowCount);
+ tvList.putAlignedValues(times, values, bitMaps, 0, rowCount, null);
return tvList;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 99a34b47084..b2cd965458e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.utils.datastructure;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
@@ -189,7 +190,8 @@ public abstract class TVList implements WALEntryValue {
throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
- public void putAlignedValues(long[] time, Object[] value, BitMap[] bitMaps,
int start, int end) {
+ public void putAlignedValues(long[] time, Object[] value, BitMap[] bitMaps,
int start, int end,
+ TSStatus[] results) {
throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index b29ef2a48ac..80b45ebf46c 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -287,7 +287,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTreeTablet(insertTabletNode1);
+ dataRegion.insertTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
for (int r = 50; r < 149; r++) {
@@ -309,7 +309,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTreeTablet(insertTabletNode2);
+ dataRegion.insertTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
dataRegion.syncCloseAllWorkingTsFileProcessors();
@@ -365,7 +365,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTreeTablet(insertTabletNode1);
+ dataRegion.insertTablet(insertTabletNode1);
for (int r = 50; r < 149; r++) {
times[r - 50] = r;
@@ -386,7 +386,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTreeTablet(insertTabletNode2);
+ dataRegion.insertTablet(insertTabletNode2);
Assert.assertTrue(SystemInfo.getInstance().getTotalMemTableSize() > 0);
dataRegion.syncDeleteDataFiles();
Assert.assertEquals(0, SystemInfo.getInstance().getTotalMemTableSize());
@@ -440,7 +440,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTreeTablet(insertTabletNode1);
+ dataRegion.insertTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
for (int r = 50; r < 149; r++) {
@@ -462,7 +462,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTreeTablet(insertTabletNode2);
+ dataRegion.insertTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
dataRegion.syncCloseAllWorkingTsFileProcessors();
@@ -516,7 +516,7 @@ public class DataRegionTest {
times.length);
insertTabletNode1.setFailedMeasurementNumber(2);
- dataRegion.insertTreeTablet(insertTabletNode1);
+ dataRegion.insertTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
for (int r = 50; r < 149; r++) {
@@ -539,7 +539,7 @@ public class DataRegionTest {
times.length);
insertTabletNode2.setFailedMeasurementNumber(2);
- dataRegion.insertTreeTablet(insertTabletNode2);
+ dataRegion.insertTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
dataRegion.syncCloseAllWorkingTsFileProcessors();
@@ -708,7 +708,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTreeTablet(insertTabletNode1);
+ dataRegion.insertTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
for (int r = 149; r >= 50; r--) {
@@ -729,7 +729,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTreeTablet(insertTabletNode2);
+ dataRegion.insertTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
dataRegion.syncCloseAllWorkingTsFileProcessors();
@@ -793,7 +793,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTreeTablet(insertTabletNode1);
+ dataRegion.insertTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
for (int r = 1249; r >= 50; r--) {
@@ -814,7 +814,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTreeTablet(insertTabletNode2);
+ dataRegion.insertTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
dataRegion.syncCloseAllWorkingTsFileProcessors();
@@ -878,7 +878,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTreeTablet(insertTabletNode1);
+ dataRegion.insertTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
for (int r = 1249; r >= 50; r--) {
@@ -899,7 +899,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTreeTablet(insertTabletNode2);
+ dataRegion.insertTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
dataRegion.syncCloseAllWorkingTsFileProcessors();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
index d016857ec49..770d0ce05f3 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
@@ -303,12 +303,12 @@ public class TsFileProcessorTest {
this.sgInfo.initTsFileProcessorInfo(processor);
SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
// Test Tablet
- processor.insertTablet(genInsertTableNode(0, true), 0, 10, new
TSStatus[10]);
+ processor.insertTablet(genInsertTableNode(0, true), 0, 10, new
TSStatus[10], true);
IMemTable memTable = processor.getWorkMemTable();
Assert.assertEquals(1596808, memTable.getTVListsRamCost());
- processor.insertTablet(genInsertTableNode(100, true), 0, 10, new
TSStatus[10]);
+ processor.insertTablet(genInsertTableNode(100, true), 0, 10, new
TSStatus[10], true);
Assert.assertEquals(1596808, memTable.getTVListsRamCost());
- processor.insertTablet(genInsertTableNode(200, true), 0, 10, new
TSStatus[10]);
+ processor.insertTablet(genInsertTableNode(200, true), 0, 10, new
TSStatus[10], true);
Assert.assertEquals(1596808, memTable.getTVListsRamCost());
Assert.assertEquals(90000, memTable.getTotalPointsNum());
Assert.assertEquals(720360, memTable.memSize());
@@ -339,22 +339,22 @@ public class TsFileProcessorTest {
this.sgInfo.initTsFileProcessorInfo(processor);
SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
// Test Tablet
- processor.insertTablet(genInsertTableNode(0, true), 0, 10, new
TSStatus[10]);
+ processor.insertTablet(genInsertTableNode(0, true), 0, 10, new
TSStatus[10], true);
IMemTable memTable = processor.getWorkMemTable();
Assert.assertEquals(1596808, memTable.getTVListsRamCost());
- processor.insertTablet(genInsertTableNodeFors3000ToS6000(0, true), 0, 10,
new TSStatus[10]);
+ processor.insertTablet(genInsertTableNodeFors3000ToS6000(0, true), 0, 10,
new TSStatus[10], true);
Assert.assertEquals(3192808, memTable.getTVListsRamCost());
- processor.insertTablet(genInsertTableNode(100, true), 0, 10, new
TSStatus[10]);
+ processor.insertTablet(genInsertTableNode(100, true), 0, 10, new
TSStatus[10], true);
Assert.assertEquals(3192808, memTable.getTVListsRamCost());
- processor.insertTablet(genInsertTableNodeFors3000ToS6000(100, true), 0,
10, new TSStatus[10]);
+ processor.insertTablet(genInsertTableNodeFors3000ToS6000(100, true), 0,
10, new TSStatus[10], true);
Assert.assertEquals(3192808, memTable.getTVListsRamCost());
- processor.insertTablet(genInsertTableNode(200, true), 0, 10, new
TSStatus[10]);
+ processor.insertTablet(genInsertTableNode(200, true), 0, 10, new
TSStatus[10], true);
Assert.assertEquals(3192808, memTable.getTVListsRamCost());
- processor.insertTablet(genInsertTableNodeFors3000ToS6000(200, true), 0,
10, new TSStatus[10]);
+ processor.insertTablet(genInsertTableNodeFors3000ToS6000(200, true), 0,
10, new TSStatus[10], true);
Assert.assertEquals(3192808, memTable.getTVListsRamCost());
- processor.insertTablet(genInsertTableNode(300, true), 0, 10, new
TSStatus[10]);
+ processor.insertTablet(genInsertTableNode(300, true), 0, 10, new
TSStatus[10], true);
Assert.assertEquals(6385616, memTable.getTVListsRamCost());
- processor.insertTablet(genInsertTableNodeFors3000ToS6000(300, true), 0,
10, new TSStatus[10]);
+ processor.insertTablet(genInsertTableNodeFors3000ToS6000(300, true), 0,
10, new TSStatus[10], true);
Assert.assertEquals(6385616, memTable.getTVListsRamCost());
Assert.assertEquals(240000, memTable.getTotalPointsNum());
@@ -393,12 +393,12 @@ public class TsFileProcessorTest {
this.sgInfo.initTsFileProcessorInfo(processor);
SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
// Test tablet
- processor.insertTablet(genInsertTableNode(0, false), 0, 10, new
TSStatus[10]);
+ processor.insertTablet(genInsertTableNode(0, false), 0, 10, new
TSStatus[10], true);
IMemTable memTable = processor.getWorkMemTable();
Assert.assertEquals(3192000, memTable.getTVListsRamCost());
- processor.insertTablet(genInsertTableNode(100, false), 0, 10, new
TSStatus[10]);
+ processor.insertTablet(genInsertTableNode(100, false), 0, 10, new
TSStatus[10], true);
Assert.assertEquals(3192000, memTable.getTVListsRamCost());
- processor.insertTablet(genInsertTableNode(200, false), 0, 10, new
TSStatus[10]);
+ processor.insertTablet(genInsertTableNode(200, false), 0, 10, new
TSStatus[10], true);
Assert.assertEquals(3192000, memTable.getTVListsRamCost());
Assert.assertEquals(90000, memTable.getTotalPointsNum());
Assert.assertEquals(1440000, memTable.memSize());
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java
index 6b7208d10d3..bea447b649a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java
@@ -163,7 +163,7 @@ public class MemUtilsTest {
null,
columns,
1);
- Assert.assertEquals(sizeSum, MemUtils.getAlignedTabletSize(insertNode, 0,
1));
+ Assert.assertEquals(sizeSum, MemUtils.getAlignedTabletSize(insertNode, 0,
1, null));
}
/** This method tests MemUtils.getStringMem() and MemUtils.getDataPointMem()
*/
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/AlignedTVListTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/AlignedTVListTest.java
index f7a0bb6e1eb..1b04f25d1a7 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/AlignedTVListTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/AlignedTVListTest.java
@@ -104,7 +104,7 @@ public class AlignedTVListTest {
}
tvList.putAlignedValues(
- ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorArray,
null, 0, 1000);
+ ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorArray,
null, 0, 1000, null);
for (long i = 0; i < tvList.rowCount; i++) {
Assert.assertEquals(tvList.rowCount - i, tvList.getTime((int) i));
}
@@ -132,7 +132,8 @@ public class AlignedTVListTest {
}
tvList.putAlignedValues(
- ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorArray,
bitMaps, 0, 1000);
+ ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorArray,
bitMaps, 0, 1000,
+ null);
for (long i = 0; i < tvList.rowCount; i++) {
Assert.assertEquals(tvList.rowCount - i, tvList.getTime((int) i));
if (i % 100 == 0) {
@@ -164,7 +165,8 @@ public class AlignedTVListTest {
}
tvList.putAlignedValues(
- ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorArray,
bitMaps, 0, 1000);
+ ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorArray,
bitMaps, 0, 1000,
+ null);
AlignedTVList clonedTvList = tvList.clone();
for (long i = 0; i < tvList.rowCount; i++) {
@@ -220,7 +222,7 @@ public class AlignedTVListTest {
}
tvList.putAlignedValues(
- ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorArray,
bitMaps, 0, 10);
+ ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorArray,
bitMaps, 0, 10, null);
Assert.assertEquals(tvList.memoryBinaryChunkSize[1], 720);
tvList.delete(5, 15);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
index dd0d4662cc7..b0794d27f8f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
@@ -156,4 +156,5 @@ public class TsTable {
public int hashCode() {
return Objects.hash(tableName);
}
+
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java
index ca4faedffe1..ab7ae9bf6d5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import org.apache.tsfile.write.record.Tablet.ColumnType;
public enum TsTableColumnCategory {
ID((byte) 0),
@@ -74,4 +75,17 @@ public enum TsTableColumnCategory {
throw new IllegalArgumentException();
}
}
+
+ public ColumnType toTsFileColumnType() {
+ switch (this) {
+ case ID:
+ return ColumnType.ID;
+ case ATTRIBUTE:
+ return ColumnType.ATTRIBUTE;
+ case MEASUREMENT:
+ return ColumnType.MEASUREMENT;
+ default:
+ throw new IllegalArgumentException("Unsupported column type in TsFile:
" + this);
+ }
+ }
}