This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/TableModelIngestion by this 
push:
     new 7313ad4850c fix getOrCreateSchemaRegion FetchDevice add 
insertRelationalRow in MemTable add measurementColumnCnt
7313ad4850c is described below

commit 7313ad4850c2e6e2103c0502815ff9e85130eba7
Author: Tian Jiang <[email protected]>
AuthorDate: Tue Jul 9 11:09:37 2024 +0800

    fix getOrCreateSchemaRegion FetchDevice
    add insertRelationalRow in MemTable
    add measurementColumnCnt
---
 .../plan/analyze/ClusterPartitionFetcher.java      |  2 +-
 .../plan/planner/plan/node/write/InsertNode.java   | 35 +++++++++----
 .../relational/metadata/TableMetadataImpl.java     |  6 +++
 .../plan/relational/sql/ast/FetchDevice.java       |  5 +-
 .../dataregion/memtable/AbstractMemTable.java      | 60 ++++++++++++++--------
 .../java/org/apache/iotdb/db/utils/MemUtils.java   | 26 +++++++---
 .../org/apache/iotdb/db/utils/MemUtilsTest.java    |  3 +-
 7 files changed, 96 insertions(+), 41 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
index c6e30ea2d59..2f1e93d4598 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
@@ -300,7 +300,7 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
         PathPatternTree tree = new PathPatternTree();
         tree.appendPathPattern(new PartialPath(database + "." + 
MULTI_LEVEL_PATH_WILDCARD));
         TSchemaPartitionTableResp schemaPartitionTableResp =
-            client.getSchemaPartitionTable(constructSchemaPartitionReq(tree));
+            
client.getOrCreateSchemaPartitionTable(constructSchemaPartitionReq(tree));
         if (schemaPartitionTableResp.getStatus().getCode()
             == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           schemaPartition = 
parseSchemaPartitionTableResp(schemaPartitionTableResp);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index 5cc0d9b1f7d..ec85a8f2393 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -50,12 +50,14 @@ import java.util.Objects;
 
 public abstract class InsertNode extends WritePlanNode implements 
ComparableConsensusRequest {
 
-  /** this insert node doesn't need to participate in iot consensus */
+  /**
+   * this insert node doesn't need to participate in iot consensus
+   */
   public static final long NO_CONSENSUS_INDEX = 
ConsensusReqReader.DEFAULT_SEARCH_INDEX;
 
   /**
-   * if use id table, this filed is id form of device path <br>
-   * if not, this filed is device path<br>
+   * if use id table, this filed is id form of device path <br> if not, this 
filed is device
+   * path<br>
    */
   protected PartialPath devicePath;
 
@@ -71,8 +73,7 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
   protected int failedMeasurementNumber = 0;
 
   /**
-   * device id reference, for reuse device id in both id table and memtable 
<br>
-   * used in memtable
+   * device id reference, for reuse device id in both id table and memtable 
<br> used in memtable
    */
   protected IDeviceID deviceID;
 
@@ -82,7 +83,9 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
    */
   protected long searchIndex = NO_CONSENSUS_INDEX;
 
-  /** Physical address of data region after splitting */
+  /**
+   * Physical address of data region after splitting
+   */
   protected TRegionReplicaSet dataRegionReplicaSet;
 
   protected ProgressIndex progressIndex;
@@ -153,6 +156,14 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
     return measurements;
   }
 
+  public int measureColumnCnt() {
+    if (columnCategories == null) {
+      return measurements.length;
+    }
+    return (int) Arrays.stream(columnCategories)
+        .filter(col -> col == TsTableColumnCategory.MEASUREMENT).count();
+  }
+
   public boolean isValidMeasurement(int i) {
     return measurementSchemas != null
         && measurementSchemas[i] != null
@@ -204,7 +215,9 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
     return searchIndex;
   }
 
-  /** Search index should start from 1 */
+  /**
+   * Search index should start from 1
+   */
   public void setSearchIndex(long searchIndex) {
     this.searchIndex = searchIndex;
   }
@@ -221,7 +234,9 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
 
   // region Serialization methods for WAL
 
-  /** Serialized size of measurement schemas, ignoring failed time series */
+  /**
+   * Serialized size of measurement schemas, ignoring failed time series
+   */
   protected int serializeMeasurementSchemasSize() {
     int byteLen = 0;
     for (int i = 0; i < measurements.length; i++) {
@@ -234,7 +249,9 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
     return byteLen;
   }
 
-  /** Serialize measurement schemas, ignoring failed time series */
+  /**
+   * Serialize measurement schemas, ignoring failed time series
+   */
   protected void serializeMeasurementSchemasToWAL(IWALByteBufferView buffer) {
     for (int i = 0; i < measurements.length; i++) {
       // ignore failed partial insert
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
index b017f8d3d72..ac99dec07f6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
@@ -324,6 +324,12 @@ public class TableMetadataImpl implements Metadata {
     
TableDeviceSchemaValidator.getInstance().validateDeviceSchema(schemaValidation, 
context);
   }
 
+  @Override
+  public DataPartition getOrCreateDataPartition(
+      List<DataPartitionQueryParam> dataPartitionQueryParams, String userName) 
{
+    return partitionFetcher.getOrCreateDataPartition(dataPartitionQueryParams, 
userName);
+  }
+
   @Override
   public SchemaPartition getOrCreateSchemaPartition(
       String database, List<IDeviceID> deviceIDList, String userName) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/FetchDevice.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/FetchDevice.java
index d7c37ac1d14..e27bfc2bb31 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/FetchDevice.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/FetchDevice.java
@@ -58,10 +58,9 @@ public class FetchDevice extends Statement {
     if (partitionKeyList == null) {
       List<IDeviceID> partitionKeyList = new ArrayList<>();
       for (Object[] rawId : deviceIdList) {
-        String[] partitionKey = new String[rawId.length + 1];
-        partitionKey[0] = tableName;
+        String[] partitionKey = new String[rawId.length];
         for (int i = 0; i < rawId.length; i++) {
-          partitionKey[i + 1] = Objects.toString(rawId[i].toString());
+          partitionKey[i] = Objects.toString(rawId[i].toString());
         }
         
partitionKeyList.add(IDeviceID.Factory.DEFAULT_FACTORY.create(partitionKey));
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 7ecd348a2ca..2a9f4424bd3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -83,12 +83,17 @@ import java.util.stream.LongStream;
 import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE;
 
 public abstract class AbstractMemTable implements IMemTable {
-  /** Each memTable node has a unique int value identifier, init when 
recovering wal. */
+
+  /**
+   * Each memTable node has a unique int value identifier, init when 
recovering wal.
+   */
   public static final AtomicLong memTableIdCounter = new AtomicLong(-1);
 
   private static final int FIXED_SERIALIZED_SIZE = Byte.BYTES + 2 * 
Integer.BYTES + 6 * Long.BYTES;
 
-  /** DeviceId -> chunkGroup(MeasurementId -> chunk). */
+  /**
+   * DeviceId -> chunkGroup(MeasurementId -> chunk).
+   */
   private final Map<IDeviceID, IWritableMemChunkGroup> memTableMap;
 
   private static final DeviceIDFactory deviceIDFactory = 
DeviceIDFactory.getInstance();
@@ -98,7 +103,9 @@ public abstract class AbstractMemTable implements IMemTable {
   private final int avgSeriesPointNumThreshold =
       
IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
 
-  /** Memory size of data points, including TEXT values. */
+  /**
+   * Memory size of data points, including TEXT values.
+   */
   private long memSize = 0;
 
   /**
@@ -121,7 +128,9 @@ public abstract class AbstractMemTable implements IMemTable 
{
 
   private final long createdTime = System.currentTimeMillis();
 
-  /** this time is updated by the timed flush, same as createdTime when the 
feature is disabled. */
+  /**
+   * this time is updated by the timed flush, same as createdTime when the 
feature is disabled.
+   */
   private long updateTime = createdTime;
 
   /**
@@ -164,7 +173,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
   /**
    * Create this MemChunk if it's not exist.
    *
-   * @param deviceId device id
+   * @param deviceId   device id
    * @param schemaList measurement schemaList
    * @return this MemChunkGroup
    */
@@ -268,7 +277,9 @@ public abstract class AbstractMemTable implements IMemTable 
{
     List<TSDataType> dataTypes = new ArrayList<>();
     for (int i = 0; i < insertRowNode.getMeasurements().length; i++) {
       // Use measurements[i] to ignore failed partial insert
-      if (measurements[i] == null || values[i] == null) {
+      if (measurements[i] == null || values[i] == null
+          || insertRowNode.getColumnCategories() != null
+          && insertRowNode.getColumnCategories()[i] != 
TsTableColumnCategory.MEASUREMENT) {
         schemaList.add(null);
         continue;
       }
@@ -279,10 +290,11 @@ public abstract class AbstractMemTable implements 
IMemTable {
     if (schemaList.isEmpty()) {
       return;
     }
-    memSize += MemUtils.getAlignedRowRecordSize(dataTypes, values);
+    memSize += MemUtils.getAlignedRowRecordSize(dataTypes, values,
+        insertRowNode.getColumnCategories());
     writeAlignedRow(insertRowNode.getDeviceID(), schemaList, 
insertRowNode.getTime(), values);
     int pointsInserted =
-        insertRowNode.getMeasurements().length - 
insertRowNode.getFailedMeasurementNumber();
+        insertRowNode.getMeasurementColumnCnt() - 
insertRowNode.getFailedMeasurementNumber();
     totalPointsNum += pointsInserted;
 
     MetricService.getInstance()
@@ -360,7 +372,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
       memSize += MemUtils.getAlignedTabletSize(insertTabletNode, start, end, 
results);
       int pointsInserted =
           (insertTabletNode.getMeasurementColumnCnt()
-                  - insertTabletNode.getFailedMeasurementNumber())
+              - insertTabletNode.getFailedMeasurementNumber())
               * (end - start);
       totalPointsNum += pointsInserted;
       MetricService.getInstance()
@@ -448,7 +460,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
     for (int i = 0; i < insertTabletNode.getMeasurementSchemas().length; i++) {
       if (insertTabletNode.getColumns()[i] == null
           || (insertTabletNode.getColumnCategories() != null
-              && insertTabletNode.getColumnCategories()[i] != 
TsTableColumnCategory.MEASUREMENT)) {
+          && insertTabletNode.getColumnCategories()[i] != 
TsTableColumnCategory.MEASUREMENT)) {
         schemaList.add(null);
       } else {
         schemaList.add(insertTabletNode.getMeasurementSchemas()[i]);
@@ -807,7 +819,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
 
   private long[] calculateStartEndTime(long[] timestamps, List<BitMap> 
bitMaps) {
     if (bitMaps.isEmpty()) {
-      return new long[] {timestamps[0], timestamps[timestamps.length - 1]};
+      return new long[]{timestamps[0], timestamps[timestamps.length - 1]};
     }
     long startTime = -1, endTime = -1;
     for (int i = 0; i < timestamps.length; i++) {
@@ -827,7 +839,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
         break;
       }
     }
-    return new long[] {startTime, endTime};
+    return new long[]{startTime, endTime};
   }
 
   private IChunkMetadata buildChunkMetaDataForMemoryChunk(
@@ -876,12 +888,13 @@ public abstract class AbstractMemTable implements 
IMemTable {
   /**
    * Delete data by path and timeStamp.
    *
-   * @param originalPath the original path pattern or full path to be used to 
match timeseries, e.g.
-   *     root.sg.**, root.sg.*.s, root.sg.d.s
-   * @param devicePath one of the device path patterns generated by original 
path, e.g. given
-   *     original path root.sg.** and the device path may be root.sg or 
root.sg.**
+   * @param originalPath   the original path pattern or full path to be used 
to match timeseries,
+   *                       e.g. root.sg.**, root.sg.*.s, root.sg.d.s
+   * @param devicePath     one of the device path patterns generated by 
original path, e.g. given
+   *                       original path root.sg.** and the device path may be 
root.sg or
+   *                       root.sg.**
    * @param startTimestamp the lower-bound of deletion time.
-   * @param endTimestamp the upper-bound of deletion time
+   * @param endTimestamp   the upper-bound of deletion time
    */
   @SuppressWarnings("squid:S3776") // high Cognitive Complexity
   @Override
@@ -990,7 +1003,9 @@ public abstract class AbstractMemTable implements 
IMemTable {
     return createdTime;
   }
 
-  /** Check whether updated since last get method */
+  /**
+   * Check whether updated since last get method
+   */
   @Override
   public long getUpdateTime() {
     if (lastTotalPointsNum != totalPointsNum) {
@@ -1010,7 +1025,9 @@ public abstract class AbstractMemTable implements 
IMemTable {
     this.flushStatus = flushStatus;
   }
 
-  /** Notice: this method is concurrent unsafe. */
+  /**
+   * Notice: this method is concurrent unsafe.
+   */
   @Override
   public int serializedSize() {
     if (isSignalMemTable()) {
@@ -1025,7 +1042,9 @@ public abstract class AbstractMemTable implements 
IMemTable {
     return size;
   }
 
-  /** Notice: this method is concurrent unsafe. */
+  /**
+   * Notice: this method is concurrent unsafe.
+   */
   @Override
   public void serializeToWAL(IWALByteBufferView buffer) {
     // TODO:[WAL]
@@ -1088,6 +1107,7 @@ public abstract class AbstractMemTable implements 
IMemTable {
   }
 
   public static class Factory {
+
     private Factory() {
       // Empty constructor
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index 04266bbd529..70cc9befd69 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -46,7 +47,8 @@ public class MemUtils {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(MemUtils.class);
 
-  private MemUtils() {}
+  private MemUtils() {
+  }
 
   /**
    * Function for obtaining the value size. For text values, there are two 
conditions: 1. During
@@ -81,11 +83,13 @@ public class MemUtils {
    * Function for obtaining the value size. For text values, their size has 
already been added to
    * memory before insertion
    */
-  public static long getAlignedRowRecordSize(List<TSDataType> dataTypes, 
Object[] value) {
+  public static long getAlignedRowRecordSize(List<TSDataType> dataTypes, 
Object[] value,
+      TsTableColumnCategory[] columnCategories) {
     // time and index size
     long memSize = 8L + 4L;
     for (int i = 0; i < dataTypes.size(); i++) {
-      if (value[i] == null || dataTypes.get(i).isBinary()) {
+      if (value[i] == null || dataTypes.get(i).isBinary()
+          || columnCategories != null && columnCategories[i] != 
TsTableColumnCategory.MEASUREMENT) {
         continue;
       }
       memSize += dataTypes.get(i).getDataTypeSize();
@@ -156,7 +160,9 @@ public class MemUtils {
     return memSize;
   }
 
-  /** Calculate how much memory will be used if the given record is written to 
sequence file. */
+  /**
+   * Calculate how much memory will be used if the given record is written to 
sequence file.
+   */
   public static long getTsRecordMem(TSRecord tsRecord) {
     long memUsed = 8; // time
     memUsed += 8; // deviceId reference
@@ -168,13 +174,17 @@ public class MemUtils {
     return memUsed;
   }
 
-  /** Function for getting the memory size of the given string. */
+  /**
+   * Function for getting the memory size of the given string.
+   */
   public static long getStringMem(String str) {
     // wide char (2 bytes each) and 64B String overhead
     return str.length() * 2L + 64L;
   }
 
-  /** Function for getting the memory size of the given data point. */
+  /**
+   * Function for getting the memory size of the given data point.
+   */
   public static long getDataPointMem(DataPoint dataPoint) {
     // type reference
     long memUsed = 8;
@@ -206,7 +216,9 @@ public class MemUtils {
     return memUsed;
   }
 
-  /** Function for converting the byte count result to readable string. */
+  /**
+   * Function for converting the byte count result to readable string.
+   */
   public static String bytesCntToStr(long inputCnt) {
     long cnt = inputCnt;
     long gbs = cnt / IoTDBConstant.GB;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java
index bea447b649a..450e39e5851 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java
@@ -87,7 +87,8 @@ public class MemUtilsTest {
     dataTypes.add(TSDataType.TEXT);
     // time and index size
     sizeSum += 8 + 4;
-    Assert.assertEquals(sizeSum, MemUtils.getAlignedRowRecordSize(dataTypes, 
row));
+    Assert.assertEquals(sizeSum, MemUtils.getAlignedRowRecordSize(dataTypes, 
row,
+        null));
   }
 
   @Test

Reply via email to