This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion by this
push:
new 7313ad4850c fix getOrCreateSchemaRegion FetchDevice add
insertRelationalRow in MemTable add measurementColumnCnt
7313ad4850c is described below
commit 7313ad4850c2e6e2103c0502815ff9e85130eba7
Author: Tian Jiang <[email protected]>
AuthorDate: Tue Jul 9 11:09:37 2024 +0800
fix getOrCreateSchemaRegion FetchDevice
add insertRelationalRow in MemTable
add measurementColumnCnt
---
.../plan/analyze/ClusterPartitionFetcher.java | 2 +-
.../plan/planner/plan/node/write/InsertNode.java | 35 +++++++++----
.../relational/metadata/TableMetadataImpl.java | 6 +++
.../plan/relational/sql/ast/FetchDevice.java | 5 +-
.../dataregion/memtable/AbstractMemTable.java | 60 ++++++++++++++--------
.../java/org/apache/iotdb/db/utils/MemUtils.java | 26 +++++++---
.../org/apache/iotdb/db/utils/MemUtilsTest.java | 3 +-
7 files changed, 96 insertions(+), 41 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
index c6e30ea2d59..2f1e93d4598 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
@@ -300,7 +300,7 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
PathPatternTree tree = new PathPatternTree();
tree.appendPathPattern(new PartialPath(database + "." +
MULTI_LEVEL_PATH_WILDCARD));
TSchemaPartitionTableResp schemaPartitionTableResp =
- client.getSchemaPartitionTable(constructSchemaPartitionReq(tree));
+
client.getOrCreateSchemaPartitionTable(constructSchemaPartitionReq(tree));
if (schemaPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
schemaPartition =
parseSchemaPartitionTableResp(schemaPartitionTableResp);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index 5cc0d9b1f7d..ec85a8f2393 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -50,12 +50,14 @@ import java.util.Objects;
public abstract class InsertNode extends WritePlanNode implements
ComparableConsensusRequest {
- /** this insert node doesn't need to participate in iot consensus */
+ /**
+ * this insert node doesn't need to participate in iot consensus
+ */
public static final long NO_CONSENSUS_INDEX =
ConsensusReqReader.DEFAULT_SEARCH_INDEX;
/**
- * if use id table, this filed is id form of device path <br>
- * if not, this filed is device path<br>
+ * if use id table, this filed is id form of device path <br> if not, this
filed is device
+ * path<br>
*/
protected PartialPath devicePath;
@@ -71,8 +73,7 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
protected int failedMeasurementNumber = 0;
/**
- * device id reference, for reuse device id in both id table and memtable
<br>
- * used in memtable
+ * device id reference, for reuse device id in both id table and memtable
<br> used in memtable
*/
protected IDeviceID deviceID;
@@ -82,7 +83,9 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
*/
protected long searchIndex = NO_CONSENSUS_INDEX;
- /** Physical address of data region after splitting */
+ /**
+ * Physical address of data region after splitting
+ */
protected TRegionReplicaSet dataRegionReplicaSet;
protected ProgressIndex progressIndex;
@@ -153,6 +156,14 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
return measurements;
}
+ public int measureColumnCnt() {
+ if (columnCategories == null) {
+ return measurements.length;
+ }
+ return (int) Arrays.stream(columnCategories)
+ .filter(col -> col == TsTableColumnCategory.MEASUREMENT).count();
+ }
+
public boolean isValidMeasurement(int i) {
return measurementSchemas != null
&& measurementSchemas[i] != null
@@ -204,7 +215,9 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
return searchIndex;
}
- /** Search index should start from 1 */
+ /**
+ * Search index should start from 1
+ */
public void setSearchIndex(long searchIndex) {
this.searchIndex = searchIndex;
}
@@ -221,7 +234,9 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
// region Serialization methods for WAL
- /** Serialized size of measurement schemas, ignoring failed time series */
+ /**
+ * Serialized size of measurement schemas, ignoring failed time series
+ */
protected int serializeMeasurementSchemasSize() {
int byteLen = 0;
for (int i = 0; i < measurements.length; i++) {
@@ -234,7 +249,9 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
return byteLen;
}
- /** Serialize measurement schemas, ignoring failed time series */
+ /**
+ * Serialize measurement schemas, ignoring failed time series
+ */
protected void serializeMeasurementSchemasToWAL(IWALByteBufferView buffer) {
for (int i = 0; i < measurements.length; i++) {
// ignore failed partial insert
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
index b017f8d3d72..ac99dec07f6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
@@ -324,6 +324,12 @@ public class TableMetadataImpl implements Metadata {
TableDeviceSchemaValidator.getInstance().validateDeviceSchema(schemaValidation,
context);
}
+ @Override
+ public DataPartition getOrCreateDataPartition(
+ List<DataPartitionQueryParam> dataPartitionQueryParams, String userName)
{
+ return partitionFetcher.getOrCreateDataPartition(dataPartitionQueryParams,
userName);
+ }
+
@Override
public SchemaPartition getOrCreateSchemaPartition(
String database, List<IDeviceID> deviceIDList, String userName) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/FetchDevice.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/FetchDevice.java
index d7c37ac1d14..e27bfc2bb31 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/FetchDevice.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/FetchDevice.java
@@ -58,10 +58,9 @@ public class FetchDevice extends Statement {
if (partitionKeyList == null) {
List<IDeviceID> partitionKeyList = new ArrayList<>();
for (Object[] rawId : deviceIdList) {
- String[] partitionKey = new String[rawId.length + 1];
- partitionKey[0] = tableName;
+ String[] partitionKey = new String[rawId.length];
for (int i = 0; i < rawId.length; i++) {
- partitionKey[i + 1] = Objects.toString(rawId[i].toString());
+ partitionKey[i] = Objects.toString(rawId[i].toString());
}
partitionKeyList.add(IDeviceID.Factory.DEFAULT_FACTORY.create(partitionKey));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 7ecd348a2ca..2a9f4424bd3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -83,12 +83,17 @@ import java.util.stream.LongStream;
import static
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE;
public abstract class AbstractMemTable implements IMemTable {
- /** Each memTable node has a unique int value identifier, init when
recovering wal. */
+
+ /**
+ * Each memTable node has a unique int value identifier, init when
recovering wal.
+ */
public static final AtomicLong memTableIdCounter = new AtomicLong(-1);
private static final int FIXED_SERIALIZED_SIZE = Byte.BYTES + 2 *
Integer.BYTES + 6 * Long.BYTES;
- /** DeviceId -> chunkGroup(MeasurementId -> chunk). */
+ /**
+ * DeviceId -> chunkGroup(MeasurementId -> chunk).
+ */
private final Map<IDeviceID, IWritableMemChunkGroup> memTableMap;
private static final DeviceIDFactory deviceIDFactory =
DeviceIDFactory.getInstance();
@@ -98,7 +103,9 @@ public abstract class AbstractMemTable implements IMemTable {
private final int avgSeriesPointNumThreshold =
IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
- /** Memory size of data points, including TEXT values. */
+ /**
+ * Memory size of data points, including TEXT values.
+ */
private long memSize = 0;
/**
@@ -121,7 +128,9 @@ public abstract class AbstractMemTable implements IMemTable
{
private final long createdTime = System.currentTimeMillis();
- /** this time is updated by the timed flush, same as createdTime when the
feature is disabled. */
+ /**
+ * this time is updated by the timed flush, same as createdTime when the
feature is disabled.
+ */
private long updateTime = createdTime;
/**
@@ -164,7 +173,7 @@ public abstract class AbstractMemTable implements IMemTable
{
/**
* Create this MemChunk if it's not exist.
*
- * @param deviceId device id
+ * @param deviceId device id
* @param schemaList measurement schemaList
* @return this MemChunkGroup
*/
@@ -268,7 +277,9 @@ public abstract class AbstractMemTable implements IMemTable
{
List<TSDataType> dataTypes = new ArrayList<>();
for (int i = 0; i < insertRowNode.getMeasurements().length; i++) {
// Use measurements[i] to ignore failed partial insert
- if (measurements[i] == null || values[i] == null) {
+ if (measurements[i] == null || values[i] == null
+ || insertRowNode.getColumnCategories() != null
+ && insertRowNode.getColumnCategories()[i] !=
TsTableColumnCategory.MEASUREMENT) {
schemaList.add(null);
continue;
}
@@ -279,10 +290,11 @@ public abstract class AbstractMemTable implements
IMemTable {
if (schemaList.isEmpty()) {
return;
}
- memSize += MemUtils.getAlignedRowRecordSize(dataTypes, values);
+ memSize += MemUtils.getAlignedRowRecordSize(dataTypes, values,
+ insertRowNode.getColumnCategories());
writeAlignedRow(insertRowNode.getDeviceID(), schemaList,
insertRowNode.getTime(), values);
int pointsInserted =
- insertRowNode.getMeasurements().length -
insertRowNode.getFailedMeasurementNumber();
+ insertRowNode.getMeasurementColumnCnt() -
insertRowNode.getFailedMeasurementNumber();
totalPointsNum += pointsInserted;
MetricService.getInstance()
@@ -360,7 +372,7 @@ public abstract class AbstractMemTable implements IMemTable
{
memSize += MemUtils.getAlignedTabletSize(insertTabletNode, start, end,
results);
int pointsInserted =
(insertTabletNode.getMeasurementColumnCnt()
- - insertTabletNode.getFailedMeasurementNumber())
+ - insertTabletNode.getFailedMeasurementNumber())
* (end - start);
totalPointsNum += pointsInserted;
MetricService.getInstance()
@@ -448,7 +460,7 @@ public abstract class AbstractMemTable implements IMemTable
{
for (int i = 0; i < insertTabletNode.getMeasurementSchemas().length; i++) {
if (insertTabletNode.getColumns()[i] == null
|| (insertTabletNode.getColumnCategories() != null
- && insertTabletNode.getColumnCategories()[i] !=
TsTableColumnCategory.MEASUREMENT)) {
+ && insertTabletNode.getColumnCategories()[i] !=
TsTableColumnCategory.MEASUREMENT)) {
schemaList.add(null);
} else {
schemaList.add(insertTabletNode.getMeasurementSchemas()[i]);
@@ -807,7 +819,7 @@ public abstract class AbstractMemTable implements IMemTable
{
private long[] calculateStartEndTime(long[] timestamps, List<BitMap>
bitMaps) {
if (bitMaps.isEmpty()) {
- return new long[] {timestamps[0], timestamps[timestamps.length - 1]};
+ return new long[]{timestamps[0], timestamps[timestamps.length - 1]};
}
long startTime = -1, endTime = -1;
for (int i = 0; i < timestamps.length; i++) {
@@ -827,7 +839,7 @@ public abstract class AbstractMemTable implements IMemTable
{
break;
}
}
- return new long[] {startTime, endTime};
+ return new long[]{startTime, endTime};
}
private IChunkMetadata buildChunkMetaDataForMemoryChunk(
@@ -876,12 +888,13 @@ public abstract class AbstractMemTable implements
IMemTable {
/**
* Delete data by path and timeStamp.
*
- * @param originalPath the original path pattern or full path to be used to
match timeseries, e.g.
- * root.sg.**, root.sg.*.s, root.sg.d.s
- * @param devicePath one of the device path patterns generated by original
path, e.g. given
- * original path root.sg.** and the device path may be root.sg or
root.sg.**
+ * @param originalPath the original path pattern or full path to be used
to match timeseries,
+ * e.g. root.sg.**, root.sg.*.s, root.sg.d.s
+ * @param devicePath one of the device path patterns generated by
original path, e.g. given
+ * original path root.sg.** and the device path may be
root.sg or
+ * root.sg.**
* @param startTimestamp the lower-bound of deletion time.
- * @param endTimestamp the upper-bound of deletion time
+ * @param endTimestamp the upper-bound of deletion time
*/
@SuppressWarnings("squid:S3776") // high Cognitive Complexity
@Override
@@ -990,7 +1003,9 @@ public abstract class AbstractMemTable implements
IMemTable {
return createdTime;
}
- /** Check whether updated since last get method */
+ /**
+ * Check whether updated since last get method
+ */
@Override
public long getUpdateTime() {
if (lastTotalPointsNum != totalPointsNum) {
@@ -1010,7 +1025,9 @@ public abstract class AbstractMemTable implements
IMemTable {
this.flushStatus = flushStatus;
}
- /** Notice: this method is concurrent unsafe. */
+ /**
+ * Notice: this method is concurrent unsafe.
+ */
@Override
public int serializedSize() {
if (isSignalMemTable()) {
@@ -1025,7 +1042,9 @@ public abstract class AbstractMemTable implements
IMemTable {
return size;
}
- /** Notice: this method is concurrent unsafe. */
+ /**
+ * Notice: this method is concurrent unsafe.
+ */
@Override
public void serializeToWAL(IWALByteBufferView buffer) {
// TODO:[WAL]
@@ -1088,6 +1107,7 @@ public abstract class AbstractMemTable implements
IMemTable {
}
public static class Factory {
+
private Factory() {
// Empty constructor
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index 04266bbd529..70cc9befd69 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -46,7 +47,8 @@ public class MemUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(MemUtils.class);
- private MemUtils() {}
+ private MemUtils() {
+ }
/**
* Function for obtaining the value size. For text values, there are two
conditions: 1. During
@@ -81,11 +83,13 @@ public class MemUtils {
* Function for obtaining the value size. For text values, their size has
already been added to
* memory before insertion
*/
- public static long getAlignedRowRecordSize(List<TSDataType> dataTypes,
Object[] value) {
+ public static long getAlignedRowRecordSize(List<TSDataType> dataTypes,
Object[] value,
+ TsTableColumnCategory[] columnCategories) {
// time and index size
long memSize = 8L + 4L;
for (int i = 0; i < dataTypes.size(); i++) {
- if (value[i] == null || dataTypes.get(i).isBinary()) {
+ if (value[i] == null || dataTypes.get(i).isBinary()
+ || columnCategories != null && columnCategories[i] !=
TsTableColumnCategory.MEASUREMENT) {
continue;
}
memSize += dataTypes.get(i).getDataTypeSize();
@@ -156,7 +160,9 @@ public class MemUtils {
return memSize;
}
- /** Calculate how much memory will be used if the given record is written to
sequence file. */
+ /**
+ * Calculate how much memory will be used if the given record is written to
sequence file.
+ */
public static long getTsRecordMem(TSRecord tsRecord) {
long memUsed = 8; // time
memUsed += 8; // deviceId reference
@@ -168,13 +174,17 @@ public class MemUtils {
return memUsed;
}
- /** Function for getting the memory size of the given string. */
+ /**
+ * Function for getting the memory size of the given string.
+ */
public static long getStringMem(String str) {
// wide char (2 bytes each) and 64B String overhead
return str.length() * 2L + 64L;
}
- /** Function for getting the memory size of the given data point. */
+ /**
+ * Function for getting the memory size of the given data point.
+ */
public static long getDataPointMem(DataPoint dataPoint) {
// type reference
long memUsed = 8;
@@ -206,7 +216,9 @@ public class MemUtils {
return memUsed;
}
- /** Function for converting the byte count result to readable string. */
+ /**
+ * Function for converting the byte count result to readable string.
+ */
public static String bytesCntToStr(long inputCnt) {
long cnt = inputCnt;
long gbs = cnt / IoTDBConstant.GB;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java
index bea447b649a..450e39e5851 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java
@@ -87,7 +87,8 @@ public class MemUtilsTest {
dataTypes.add(TSDataType.TEXT);
// time and index size
sizeSum += 8 + 4;
- Assert.assertEquals(sizeSum, MemUtils.getAlignedRowRecordSize(dataTypes,
row));
+ Assert.assertEquals(sizeSum, MemUtils.getAlignedRowRecordSize(dataTypes,
row,
+ null));
}
@Test