This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch force_ci/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 07758a60f710db45a72553443e4c12a457152256 Author: Zhenyu Luo <[email protected]> AuthorDate: Tue Dec 9 15:00:00 2025 +0800 Pipe: Modify the TableRawReq deserialization method to support directconversion to TableStatement. (#16844) * Pipe: Modify the TableRawReq deserialization method to support direct conversion to TableStatement. * fix * fix * fix * fix * fix * update * update * update * refactor: optimize TabletStatementConverter according to code review - Optimize times array copy: skip copy when lengths are equal, use System.arraycopy - Add warning logs when times array is null or too small - Ensure all arrays (values, times, bitMaps) are copied to rowSize length for immutability - Filter out null columns when converting Statement to Tablet - Rename idColumnIndices to tagColumnIndices - Add skipString method to avoid constructing temporary objects - Add comments explaining skipped fields in readMeasurement - Use direct buffer position increment instead of reading bytes for skipping - Ensure all column values are copied to ensure immutability * update * update (cherry picked from commit 13b0582dfb1a63a3f242b9545304d0d9fdede5cd) --- .../request/PipeTransferTabletBatchReqV2.java | 7 +- .../request/PipeTransferTabletRawReq.java | 110 +++- .../request/PipeTransferTabletRawReqV2.java | 50 +- .../pipe/sink/util/TabletStatementConverter.java | 476 ++++++++++++++++ .../sink/util/sorter/InsertEventDataAdapter.java | 127 +++++ .../util/sorter/InsertTabletStatementAdapter.java | 118 ++++ ...EventSorter.java => PipeInsertEventSorter.java} | 94 +++- .../sorter/PipeTableModelTabletEventSorter.java | 67 ++- .../sorter/PipeTreeModelTabletEventSorter.java | 48 +- .../db/pipe/sink/util/sorter/TabletAdapter.java | 113 ++++ .../plan/statement/crud/InsertBaseStatement.java | 10 + .../plan/statement/crud/InsertTabletStatement.java | 197 +++++++ .../pipe/sink/PipeDataNodeThriftRequestTest.java | 4 +- .../db/pipe/sink/PipeStatementEventSorterTest.java | 313 +++++++++++ .../sink/util/TabletStatementConverterTest.java | 607 +++++++++++++++++++++ 15 files changed, 2257 insertions(+), 84 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java index d9c3fabfae8..c136ffbe7d3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java @@ -33,7 +33,6 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; -import org.apache.tsfile.write.record.Tablet; import java.io.DataOutputStream; import java.io.IOException; @@ -247,11 +246,7 @@ public class PipeTransferTabletBatchReqV2 extends TPipeTransferReq { size = ReadWriteIOUtils.readInt(transferReq.body); for (int i = 0; i < size; ++i) { - batchReq.tabletReqs.add( - PipeTransferTabletRawReqV2.toTPipeTransferRawReq( - Tablet.deserialize(transferReq.body), - ReadWriteIOUtils.readBool(transferReq.body), - ReadWriteIOUtils.readString(transferReq.body))); + batchReq.tabletReqs.add(PipeTransferTabletRawReqV2.toTPipeTransferRawReq(transferReq.body)); } batchReq.version = transferReq.version; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java index 7da44f297d1..af9b37edbf6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.sink.payload.evolvable.request; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; +import org.apache.iotdb.db.pipe.sink.util.TabletStatementConverter; import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; @@ -43,10 +44,25 @@ public class PipeTransferTabletRawReq extends TPipeTransferReq { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferTabletRawReq.class); - protected transient Tablet tablet; + protected transient InsertTabletStatement statement; + protected transient boolean isAligned; + protected transient Tablet tablet; + /** + * Get Tablet. If tablet is null, convert from statement. + * + * @return Tablet object + */ public Tablet getTablet() { + if (tablet == null && statement != null) { + try { + tablet = statement.convertToTablet(); + } catch (final MetadataException e) { + LOGGER.warn("Failed to convert statement to tablet.", e); + return null; + } + } return tablet; } @@ -54,16 +70,29 @@ public class PipeTransferTabletRawReq extends TPipeTransferReq { return isAligned; } + /** + * Construct Statement. If statement already exists, return it. Otherwise, convert from tablet. + * + * @return InsertTabletStatement + */ public InsertTabletStatement constructStatement() { + if (statement != null) { + new PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary(); + return statement; + } + + // Sort and deduplicate tablet before converting new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); try { if (isTabletEmpty(tablet)) { // Empty statement, will be filtered after construction - return new InsertTabletStatement(); + statement = new InsertTabletStatement(); + return statement; } - return new InsertTabletStatement(tablet, isAligned, null); + statement = new InsertTabletStatement(tablet, isAligned, null); + return statement; } catch (final MetadataException e) { LOGGER.warn("Generate Statement from tablet {} error.", tablet, e); return null; @@ -107,8 +136,20 @@ public class PipeTransferTabletRawReq extends TPipeTransferReq { public static PipeTransferTabletRawReq fromTPipeTransferReq(final TPipeTransferReq transferReq) { final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq(); - tabletReq.tablet = Tablet.deserialize(transferReq.body); - tabletReq.isAligned = ReadWriteIOUtils.readBool(transferReq.body); + final ByteBuffer buffer = transferReq.body; + final int startPosition = buffer.position(); + try { + // V1: no databaseName, readDatabaseName = false + final InsertTabletStatement insertTabletStatement = + TabletStatementConverter.deserializeStatementFromTabletFormat(buffer, false); + tabletReq.isAligned = insertTabletStatement.isAligned(); + // devicePath is already set in deserializeStatementFromTabletFormat for V1 format + tabletReq.statement = insertTabletStatement; + } catch (final Exception e) { + buffer.position(startPosition); + tabletReq.tablet = Tablet.deserialize(buffer); + tabletReq.isAligned = ReadWriteIOUtils.readBool(buffer); + } tabletReq.version = transferReq.version; tabletReq.type = transferReq.type; @@ -118,18 +159,56 @@ public class PipeTransferTabletRawReq extends TPipeTransferReq { /////////////////////////////// Air Gap /////////////////////////////// - public static byte[] toTPipeTransferBytes(final Tablet tablet, final boolean isAligned) - throws IOException { + /** + * Serialize to bytes. If tablet is null, convert from statement first. + * + * @return serialized bytes + * @throws IOException if serialization fails + */ + public byte[] toTPipeTransferBytes() throws IOException { + Tablet tabletToSerialize = tablet; + boolean isAlignedToSerialize = isAligned; + + // If tablet is null, convert from statement + if (tabletToSerialize == null && statement != null) { + try { + tabletToSerialize = statement.convertToTablet(); + isAlignedToSerialize = statement.isAligned(); + } catch (final MetadataException e) { + throw new IOException("Failed to convert statement to tablet for serialization", e); + } + } + + if (tabletToSerialize == null) { + throw new IOException("Cannot serialize: both tablet and statement are null"); + } + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { ReadWriteIOUtils.write(IoTDBSinkRequestVersion.VERSION_1.getVersion(), outputStream); ReadWriteIOUtils.write(PipeRequestType.TRANSFER_TABLET_RAW.getType(), outputStream); - tablet.serialize(outputStream); - ReadWriteIOUtils.write(isAligned, outputStream); + tabletToSerialize.serialize(outputStream); + ReadWriteIOUtils.write(isAlignedToSerialize, outputStream); return byteArrayOutputStream.toByteArray(); } } + /** + * Static method for backward compatibility. Creates a temporary instance and serializes. + * + * @param tablet Tablet to serialize + * @param isAligned whether aligned + * @return serialized bytes + * @throws IOException if serialization fails + */ + public static byte[] toTPipeTransferBytes(final Tablet tablet, final boolean isAligned) + throws IOException { + final PipeTransferTabletRawReq req = new PipeTransferTabletRawReq(); + req.tablet = tablet; + req.isAligned = isAligned; + return req.toTPipeTransferBytes(); + } + /////////////////////////////// Object /////////////////////////////// @Override @@ -141,7 +220,16 @@ public class PipeTransferTabletRawReq extends TPipeTransferReq { return false; } final PipeTransferTabletRawReq that = (PipeTransferTabletRawReq) obj; - return Objects.equals(tablet, that.tablet) + // Compare statement if both have it, otherwise compare tablet + if (statement != null && that.statement != null) { + return Objects.equals(statement, that.statement) + && isAligned == that.isAligned + && version == that.version + && type == that.type + && Objects.equals(body, that.body); + } + // Fallback to tablet comparison + return Objects.equals(getTablet(), that.getTablet()) && isAligned == that.isAligned && version == that.version && type == that.type @@ -150,6 +238,6 @@ public class PipeTransferTabletRawReq extends TPipeTransferReq { @Override public int hashCode() { - return Objects.hash(tablet, isAligned, version, type, body); + return Objects.hash(getTablet(), isAligned, version, type, body); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java index 43d8501252c..3c5f420a317 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.sink.payload.evolvable.request; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; +import org.apache.iotdb.db.pipe.sink.util.TabletStatementConverter; import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter; import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; @@ -52,6 +53,16 @@ public class PipeTransferTabletRawReqV2 extends PipeTransferTabletRawReq { @Override public InsertTabletStatement constructStatement() { + if (statement != null) { + if (Objects.isNull(dataBaseName)) { + new PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary(); + } else { + new PipeTableModelTabletEventSorter(statement).sortByTimestampIfNecessary(); + } + + return statement; + } + if (Objects.isNull(dataBaseName)) { new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); } else { @@ -86,6 +97,16 @@ public class PipeTransferTabletRawReqV2 extends PipeTransferTabletRawReq { return tabletReq; } + public static PipeTransferTabletRawReqV2 toTPipeTransferRawReq(final ByteBuffer buffer) { + final PipeTransferTabletRawReqV2 tabletReq = new PipeTransferTabletRawReqV2(); + + tabletReq.deserializeTPipeTransferRawReq(buffer); + tabletReq.version = IoTDBSinkRequestVersion.VERSION_1.getVersion(); + tabletReq.type = PipeRequestType.TRANSFER_TABLET_RAW_V2.getType(); + + return tabletReq; + } + /////////////////////////////// Thrift /////////////////////////////// public static PipeTransferTabletRawReqV2 toTPipeTransferReq( @@ -114,13 +135,11 @@ public class PipeTransferTabletRawReqV2 extends PipeTransferTabletRawReq { final TPipeTransferReq transferReq) { final PipeTransferTabletRawReqV2 tabletReq = new PipeTransferTabletRawReqV2(); - tabletReq.tablet = Tablet.deserialize(transferReq.body); - tabletReq.isAligned = ReadWriteIOUtils.readBool(transferReq.body); - tabletReq.dataBaseName = ReadWriteIOUtils.readString(transferReq.body); + tabletReq.deserializeTPipeTransferRawReq(transferReq.body); + tabletReq.body = transferReq.body; tabletReq.version = transferReq.version; tabletReq.type = transferReq.type; - tabletReq.body = transferReq.body; return tabletReq; } @@ -161,4 +180,27 @@ public class PipeTransferTabletRawReqV2 extends PipeTransferTabletRawReq { public int hashCode() { return Objects.hash(super.hashCode(), dataBaseName); } + + /////////////////////////////// Util /////////////////////////////// + + public void deserializeTPipeTransferRawReq(final ByteBuffer buffer) { + final int startPosition = buffer.position(); + try { + // V2: read databaseName, readDatabaseName = true + final InsertTabletStatement insertTabletStatement = + TabletStatementConverter.deserializeStatementFromTabletFormat(buffer, true); + this.isAligned = insertTabletStatement.isAligned(); + // databaseName is already set in deserializeStatementFromTabletFormat when + // readDatabaseName=true + this.dataBaseName = insertTabletStatement.getDatabaseName().orElse(null); + this.statement = insertTabletStatement; + } catch (final Exception e) { + // If Statement deserialization fails, fallback to Tablet format + // Reset buffer position for Tablet deserialization + buffer.position(startPosition); + this.tablet = Tablet.deserialize(buffer); + this.isAligned = ReadWriteIOUtils.readBool(buffer); + this.dataBaseName = ReadWriteIOUtils.readString(buffer); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java new file mode 100644 index 00000000000..c5b9ebed4d5 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.util; + +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; +import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator; +import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; + +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.utils.BytesUtils; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.apache.tsfile.write.UnSupportedDataTypeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +/** + * Utility class for converting between InsertTabletStatement and Tablet format ByteBuffer. This + * avoids creating intermediate Tablet objects and directly converts between formats with only the + * fields needed. + */ +public class TabletStatementConverter { + + private static final Logger LOGGER = LoggerFactory.getLogger(TabletStatementConverter.class); + + // Memory calculation constants - extracted from RamUsageEstimator for better performance + private static final long NUM_BYTES_ARRAY_HEADER = + org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER; + private static final long NUM_BYTES_OBJECT_REF = + org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF; + private static final long NUM_BYTES_OBJECT_HEADER = + org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_HEADER; + private static final long SIZE_OF_ARRAYLIST = + org.apache.tsfile.utils.RamUsageEstimator.shallowSizeOfInstance(java.util.ArrayList.class); + private static final long SIZE_OF_BITMAP = + org.apache.tsfile.utils.RamUsageEstimator.shallowSizeOfInstance( + org.apache.tsfile.utils.BitMap.class); + + private TabletStatementConverter() { + // Utility class, no instantiation + } + + /** + * Deserialize InsertTabletStatement from Tablet format ByteBuffer. + * + * @param byteBuffer ByteBuffer containing serialized data + * @param readDatabaseName whether to read databaseName from buffer (for V2 format) + * @return InsertTabletStatement with all fields set, including devicePath + */ + public static InsertTabletStatement deserializeStatementFromTabletFormat( + final ByteBuffer byteBuffer, final boolean readDatabaseName) throws IllegalPathException { + final InsertTabletStatement statement = new InsertTabletStatement(); + + // Calculate memory size during deserialization, use INSTANCE_SIZE constant + long memorySize = InsertTabletStatement.getInstanceSize(); + + final String insertTargetName = ReadWriteIOUtils.readString(byteBuffer); + + final int rowSize = ReadWriteIOUtils.readInt(byteBuffer); + + // deserialize schemas + final int schemaSize = + BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)) + ? ReadWriteIOUtils.readInt(byteBuffer) + : 0; + final String[] measurement = new String[schemaSize]; + final TsTableColumnCategory[] columnCategories = new TsTableColumnCategory[schemaSize]; + final TSDataType[] dataTypes = new TSDataType[schemaSize]; + + // Calculate memory for arrays headers and references during deserialization + // measurements array: array header + object references + long measurementMemorySize = + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( + NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * schemaSize); + + // dataTypes array: shallow size (array header + references) + long dataTypesMemorySize = + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( + NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * schemaSize); + + // columnCategories array: shallow size (array header + references) + long columnCategoriesMemorySize = + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( + NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * schemaSize); + + // tagColumnIndices (TAG columns): ArrayList base + array header + long tagColumnIndicesSize = SIZE_OF_ARRAYLIST; + tagColumnIndicesSize += NUM_BYTES_ARRAY_HEADER; + + // Deserialize and calculate memory in the same loop + for (int i = 0; i < schemaSize; i++) { + final boolean hasSchema = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + if (hasSchema) { + final Pair<String, TSDataType> pair = readMeasurement(byteBuffer); + measurement[i] = pair.getLeft(); + dataTypes[i] = pair.getRight(); + columnCategories[i] = + TsTableColumnCategory.fromTsFileColumnCategory( + ColumnCategory.values()[byteBuffer.get()]); + + // Calculate memory for each measurement string + if (measurement[i] != null) { + measurementMemorySize += org.apache.tsfile.utils.RamUsageEstimator.sizeOf(measurement[i]); + } + + // Calculate memory for TAG column indices + if (columnCategories[i] != null && columnCategories[i].equals(TsTableColumnCategory.TAG)) { + tagColumnIndicesSize += + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( + Integer.BYTES + NUM_BYTES_OBJECT_HEADER) + + NUM_BYTES_OBJECT_REF; + } + } + } + + // Add all calculated memory to total + memorySize += measurementMemorySize; + memorySize += dataTypesMemorySize; + + // deserialize times and calculate memory during deserialization + final long[] times = new long[rowSize]; + // Calculate memory: array header + long size * rowSize + final long timesMemorySize = + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( + NUM_BYTES_ARRAY_HEADER + (long) Long.BYTES * rowSize); + + final boolean isTimesNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + if (isTimesNotNull) { + for (int i = 0; i < rowSize; i++) { + times[i] = ReadWriteIOUtils.readLong(byteBuffer); + } + } + + // Add times memory to total + memorySize += timesMemorySize; + + // deserialize bitmaps and calculate memory during deserialization + final BitMap[] bitMaps; + final long bitMapsMemorySize; + + final boolean isBitMapsNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + if (isBitMapsNotNull) { + // Use the method that returns both BitMap array and memory size + final Pair<BitMap[], Long> bitMapsAndMemory = + readBitMapsFromBufferWithMemory(byteBuffer, schemaSize); + bitMaps = bitMapsAndMemory.getLeft(); + bitMapsMemorySize = bitMapsAndMemory.getRight(); + } else { + // Calculate memory for empty BitMap array: array header + references + bitMaps = new BitMap[schemaSize]; + bitMapsMemorySize = + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( + NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * schemaSize); + } + + // Add bitMaps memory to total + memorySize += bitMapsMemorySize; + + // Deserialize values and calculate memory during deserialization + final Object[] values; + final long valuesMemorySize; + + final boolean isValuesNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + if (isValuesNotNull) { + // Use the method that returns both values array and memory size + final Pair<Object[], Long> valuesAndMemory = + readValuesFromBufferWithMemory(byteBuffer, dataTypes, schemaSize, rowSize); + values = valuesAndMemory.getLeft(); + valuesMemorySize = valuesAndMemory.getRight(); + } else { + // Calculate memory for empty values array: array header + references + values = new Object[schemaSize]; + valuesMemorySize = + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( + NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * schemaSize); + } + + // Add values memory to total + memorySize += valuesMemorySize; + + final boolean isAligned = ReadWriteIOUtils.readBoolean(byteBuffer); + + statement.setMeasurements(measurement); + statement.setTimes(times); + statement.setBitMaps(bitMaps); + statement.setDataTypes(dataTypes); + statement.setColumns(values); + statement.setRowCount(rowSize); + statement.setAligned(isAligned); + + // Read databaseName if requested (V2 format) + if (readDatabaseName) { + final String databaseName = ReadWriteIOUtils.readString(byteBuffer); + if (databaseName != null) { + statement.setDatabaseName(databaseName); + statement.setWriteToTable(true); + // For table model, insertTargetName is table name, convert to lowercase + statement.setDevicePath(new PartialPath(insertTargetName.toLowerCase(), false)); + // Calculate memory for databaseName + memorySize += org.apache.tsfile.utils.RamUsageEstimator.sizeOf(databaseName); + + statement.setColumnCategories(columnCategories); + + memorySize += columnCategoriesMemorySize; + memorySize += tagColumnIndicesSize; + } else { + // For tree model, use DataNodeDevicePathCache + statement.setDevicePath( + DataNodeDevicePathCache.getInstance().getPartialPath(insertTargetName)); + statement.setColumnCategories(null); + } + } else { + // V1 format: no databaseName in buffer, always use DataNodeDevicePathCache + statement.setDevicePath( + DataNodeDevicePathCache.getInstance().getPartialPath(insertTargetName)); + statement.setColumnCategories(null); + } + + // Calculate memory for devicePath + memorySize += InsertNodeMemoryEstimator.sizeOfPartialPath(statement.getDevicePath()); + + // Set the pre-calculated memory size to avoid recalculation + statement.setRamBytesUsed(memorySize); + + return statement; + } + + /** + * Deserialize InsertTabletStatement from Tablet format ByteBuffer (V1 format, no databaseName). + * + * @param byteBuffer ByteBuffer containing serialized data + * @return InsertTabletStatement with devicePath set using DataNodeDevicePathCache + */ + public static InsertTabletStatement deserializeStatementFromTabletFormat( + final ByteBuffer byteBuffer) throws IllegalPathException { + return deserializeStatementFromTabletFormat(byteBuffer, false); + } + + /** + * Skip a string in ByteBuffer without reading it. This is more efficient than reading and + * discarding the string. + * + * @param buffer ByteBuffer to skip string from + */ + private static void skipString(final ByteBuffer buffer) { + final int size = ReadWriteIOUtils.readInt(buffer); + if (size > 0) { + buffer.position(buffer.position() + size); + } + } + + /** + * Read measurement name and data type from buffer, skipping other measurement schema fields + * (encoding, compression, and tags/attributes) that are not needed for InsertTabletStatement. + * + * @param buffer ByteBuffer containing serialized measurement schema + * @return Pair of measurement name and data type + */ + private static Pair<String, TSDataType> readMeasurement(final ByteBuffer buffer) { + // Read measurement name and data type + final Pair<String, TSDataType> pair = + new Pair<>(ReadWriteIOUtils.readString(buffer), TSDataType.deserializeFrom(buffer)); + + // Skip encoding type (byte) and compression type (byte) - 2 bytes total + buffer.position(buffer.position() + 2); + + // Skip props map (Map<String, String>) + final int size = ReadWriteIOUtils.readInt(buffer); + if (size > 0) { + for (int i = 0; i < size; i++) { + // Skip key (String) and value (String) without constructing temporary objects + skipString(buffer); + skipString(buffer); + } + } + + return pair; + } + + /** + * Deserialize bitmaps and calculate memory size during deserialization. Returns a Pair of BitMap + * array and the calculated memory size. + */ + private static Pair<BitMap[], Long> readBitMapsFromBufferWithMemory( + final ByteBuffer byteBuffer, final int columns) { + final BitMap[] bitMaps = new BitMap[columns]; + + // Calculate memory: array header + object references + long memorySize = + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( + NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns); + + for (int i = 0; i < columns; i++) { + final boolean hasBitMap = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + if (hasBitMap) { + final int size = ReadWriteIOUtils.readInt(byteBuffer); + final Binary valueBinary = ReadWriteIOUtils.readBinary(byteBuffer); + final byte[] byteArray = valueBinary.getValues(); + bitMaps[i] = new BitMap(size, byteArray); + + // Calculate memory for this BitMap: BitMap object + byte array + // BitMap shallow size + byte array (array header + array length) + memorySize += + SIZE_OF_BITMAP + + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( + NUM_BYTES_ARRAY_HEADER + byteArray.length); + } + } + + return new Pair<>(bitMaps, memorySize); + } + + /** + * Deserialize values from buffer and calculate memory size during deserialization. Returns a Pair + * of values array and the calculated memory size. + * + * @param byteBuffer data values + * @param types data types + * @param columns column number + * @param rowSize row number + * @return Pair of values array and memory size + */ + @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning + private static Pair<Object[], Long> readValuesFromBufferWithMemory( + final ByteBuffer byteBuffer, final TSDataType[] types, final int columns, final int rowSize) { + final Object[] values = new Object[columns]; + + // Calculate memory: array header + object references + long memorySize = + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( + NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns); + + for (int i = 0; i < columns; i++) { + final boolean isValueColumnsNotNull = + BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + if (isValueColumnsNotNull && types[i] == null) { + continue; + } + + switch (types[i]) { + case BOOLEAN: + final boolean[] boolValues = new boolean[rowSize]; + if (isValueColumnsNotNull) { + for (int index = 0; index < rowSize; index++) { + boolValues[index] = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + } + } + values[i] = boolValues; + // Calculate memory for boolean array: array header + 1 byte per element (aligned) + memorySize += + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( + NUM_BYTES_ARRAY_HEADER + rowSize); + break; + case INT32: + case DATE: + final int[] intValues = new int[rowSize]; + if (isValueColumnsNotNull) { + for (int index = 0; index < rowSize; index++) { + intValues[index] = ReadWriteIOUtils.readInt(byteBuffer); + } + } + values[i] = intValues; + // Calculate memory for int array: array header + 4 bytes per element (aligned) + memorySize += + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( + NUM_BYTES_ARRAY_HEADER + (long) Integer.BYTES * rowSize); + break; + case INT64: + case TIMESTAMP: + final long[] longValues = new long[rowSize]; + if (isValueColumnsNotNull) { + for (int index = 0; index < rowSize; index++) { + longValues[index] = ReadWriteIOUtils.readLong(byteBuffer); + } + } + values[i] = longValues; + // Calculate memory for long array: array header + 8 bytes per element (aligned) + memorySize += + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( + NUM_BYTES_ARRAY_HEADER + (long) Long.BYTES * rowSize); + break; + case FLOAT: + final float[] floatValues = new float[rowSize]; + if (isValueColumnsNotNull) { + for (int index = 0; index < rowSize; index++) { + floatValues[index] = ReadWriteIOUtils.readFloat(byteBuffer); + } + } + values[i] = floatValues; + // Calculate memory for float array: array header + 4 bytes per element (aligned) + memorySize += + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( + NUM_BYTES_ARRAY_HEADER + (long) Float.BYTES * rowSize); + break; + case DOUBLE: + final double[] doubleValues = new double[rowSize]; + if (isValueColumnsNotNull) { + for (int index = 0; index < rowSize; index++) { + doubleValues[index] = ReadWriteIOUtils.readDouble(byteBuffer); + } + } + values[i] = doubleValues; + // Calculate memory for double array: array header + 8 bytes per element (aligned) + memorySize += + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( + NUM_BYTES_ARRAY_HEADER + (long) Double.BYTES * rowSize); + break; + case TEXT: + case STRING: + case BLOB: + case OBJECT: + // Handle object array type: Binary[] is an array of objects + final Binary[] binaryValues = new Binary[rowSize]; + // Calculate memory for Binary array: array header + object references + long binaryArrayMemory = + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( + NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * rowSize); + + if (isValueColumnsNotNull) { + for (int index = 0; index < rowSize; index++) { + final boolean isNotNull = + BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + if (isNotNull) { + binaryValues[index] = ReadWriteIOUtils.readBinary(byteBuffer); + // Calculate memory for each Binary object during deserialization + binaryArrayMemory += binaryValues[index].ramBytesUsed(); + } else { + binaryValues[index] = Binary.EMPTY_VALUE; + // EMPTY_VALUE also has memory cost + binaryArrayMemory += Binary.EMPTY_VALUE.ramBytesUsed(); + } + } + } else { + Arrays.fill(binaryValues, Binary.EMPTY_VALUE); + // Calculate memory for all EMPTY_VALUE + binaryArrayMemory += (long) rowSize * Binary.EMPTY_VALUE.ramBytesUsed(); + } + values[i] = binaryValues; + // Add calculated Binary array memory to total + memorySize += binaryArrayMemory; + break; + default: + throw new UnSupportedDataTypeException( + String.format("data type %s is not supported when convert data at client", types[i])); + } + } + + return new Pair<>(values, memorySize); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/InsertEventDataAdapter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/InsertEventDataAdapter.java new file mode 100644 index 00000000000..62111cb4129 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/InsertEventDataAdapter.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.util.sorter; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.utils.BitMap; + +/** + * Adapter interface to encapsulate common operations needed for sorting and deduplication. This + * interface allows the sorter to work with both Tablet and InsertTabletStatement. + */ +public interface InsertEventDataAdapter { + + /** + * Get the number of columns. + * + * @return number of columns + */ + int getColumnCount(); + + /** + * Get data type for a specific column. + * + * @param columnIndex column index + * @return data type of the column + */ + TSDataType getDataType(int columnIndex); + + /** + * Get bit maps for null values. + * + * @return array of bit maps, may be null + */ + BitMap[] getBitMaps(); + + /** + * Set bit maps for null values. + * + * @param bitMaps array of bit maps + */ + void setBitMaps(BitMap[] bitMaps); + + /** + * Get value arrays for all columns. + * + * @return array of value arrays (Object[]) + */ + Object[] getValues(); + + /** + * Set value array for a specific column. + * + * @param columnIndex column index + * @param value value array + */ + void setValue(int columnIndex, Object value); + + /** + * Get timestamps array. + * + * @return array of timestamps + */ + long[] getTimestamps(); + + /** + * Set timestamps array. + * + * @param timestamps array of timestamps + */ + void setTimestamps(long[] timestamps); + + /** + * Get row size/count. + * + * @return number of rows + */ + int getRowSize(); + + /** + * Set row size/count. + * + * @param rowSize number of rows + */ + void setRowSize(int rowSize); + + /** + * Get timestamp at a specific row index. + * + * @param rowIndex row index + * @return timestamp value + */ + long getTimestamp(int rowIndex); + + /** + * Get device ID at a specific row index (for table model). + * + * @param rowIndex row index + * @return device ID + */ + IDeviceID getDeviceID(int rowIndex); + + /** + * Check if the DATE type column value is stored as LocalDate[] (Tablet) or int[] (Statement). + * + * @param columnIndex column index + * @return true if DATE type is stored as LocalDate[], false if stored as int[] + */ + boolean isDateStoredAsLocalDate(int columnIndex); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/InsertTabletStatementAdapter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/InsertTabletStatementAdapter.java new file mode 100644 index 00000000000..30f74a22965 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/InsertTabletStatementAdapter.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.util.sorter; + +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.utils.BitMap; + +/** Adapter for InsertTabletStatement to implement InsertEventDataAdapter interface. */ +public class InsertTabletStatementAdapter implements InsertEventDataAdapter { + + private final InsertTabletStatement statement; + + public InsertTabletStatementAdapter(final InsertTabletStatement statement) { + this.statement = statement; + } + + @Override + public int getColumnCount() { + final Object[] columns = statement.getColumns(); + return columns != null ? columns.length : 0; + } + + @Override + public TSDataType getDataType(int columnIndex) { + final TSDataType[] dataTypes = statement.getDataTypes(); + if (dataTypes != null && columnIndex < dataTypes.length) { + return dataTypes[columnIndex]; + } + return null; + } + + @Override + public BitMap[] getBitMaps() { + return statement.getBitMaps(); + } + + @Override + public void setBitMaps(BitMap[] bitMaps) { + statement.setBitMaps(bitMaps); + } + + @Override + public Object[] getValues() { + return statement.getColumns(); + } + + @Override + public void setValue(int columnIndex, Object value) { + Object[] columns = statement.getColumns(); + if (columns != null && columnIndex < columns.length) { + columns[columnIndex] = value; + } + } + + @Override + public long[] getTimestamps() { + return statement.getTimes(); + } + + @Override + public void setTimestamps(long[] timestamps) { + statement.setTimes(timestamps); + } + + @Override + public int getRowSize() { + return statement.getRowCount(); + } + + @Override + public void setRowSize(int rowSize) { + statement.setRowCount(rowSize); + } + + @Override + public long getTimestamp(int rowIndex) { + long[] times = statement.getTimes(); + if (times != null && rowIndex < times.length) { + return times[rowIndex]; + } + return 0; + } + + @Override + public IDeviceID getDeviceID(int rowIndex) { + return statement.getTableDeviceID(rowIndex); + } + + @Override + public boolean isDateStoredAsLocalDate(int columnIndex) { + // InsertTabletStatement stores DATE as int[] + return false; + } + + public InsertTabletStatement getStatement() { + return statement; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeInsertEventSorter.java similarity index 65% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeInsertEventSorter.java index 6540bf9855d..46a3fc6df94 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeInsertEventSorter.java @@ -19,19 +19,20 @@ package org.apache.iotdb.db.pipe.sink.util.sorter; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; + import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.write.UnSupportedDataTypeException; import org.apache.tsfile.write.record.Tablet; -import org.apache.tsfile.write.schema.IMeasurementSchema; import java.time.LocalDate; import java.util.Objects; -public class PipeTabletEventSorter { +public class PipeInsertEventSorter { - protected final Tablet tablet; + protected final InsertEventDataAdapter dataAdapter; protected Integer[] index; protected boolean isSorted = true; @@ -39,8 +40,31 @@ public class PipeTabletEventSorter { protected int[] deDuplicatedIndex; protected int deDuplicatedSize; - public PipeTabletEventSorter(final Tablet tablet) { - this.tablet = tablet; + /** + * Constructor for Tablet. + * + * @param tablet the tablet to sort + */ + public PipeInsertEventSorter(final Tablet tablet) { + this.dataAdapter = new TabletAdapter(tablet); + } + + /** + * Constructor for InsertTabletStatement. + * + * @param statement the insert tablet statement to sort + */ + public PipeInsertEventSorter(final InsertTabletStatement statement) { + this.dataAdapter = new InsertTabletStatementAdapter(statement); + } + + /** + * Constructor with adapter (for internal use or advanced scenarios). + * + * @param adapter the data adapter + */ + protected PipeInsertEventSorter(final InsertEventDataAdapter adapter) { + this.dataAdapter = adapter; } // Input: @@ -54,35 +78,42 @@ public class PipeTabletEventSorter { // (Used index: [2(3), 4(0)]) // Col: [6, 1] protected void sortAndMayDeduplicateValuesAndBitMaps() { - int columnIndex = 0; - for (int i = 0, size = tablet.getSchemas().size(); i < size; i++) { - final IMeasurementSchema schema = tablet.getSchemas().get(i); - if (schema != null) { + final int columnCount = dataAdapter.getColumnCount(); + BitMap[] bitMaps = dataAdapter.getBitMaps(); + boolean bitMapsModified = false; + + for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) { + final TSDataType dataType = dataAdapter.getDataType(columnIndex); + if (dataType != null) { BitMap deDuplicatedBitMap = null; BitMap originalBitMap = null; - if (tablet.getBitMaps() != null && tablet.getBitMaps()[columnIndex] != null) { - originalBitMap = tablet.getBitMaps()[columnIndex]; + if (bitMaps != null && columnIndex < bitMaps.length && bitMaps[columnIndex] != null) { + originalBitMap = bitMaps[columnIndex]; deDuplicatedBitMap = new BitMap(originalBitMap.getSize()); } - tablet.getValues()[columnIndex] = + final Object[] values = dataAdapter.getValues(); + final Object reorderedValue = reorderValueListAndBitMap( - tablet.getValues()[columnIndex], - schema.getType(), - originalBitMap, - deDuplicatedBitMap); + values[columnIndex], dataType, columnIndex, originalBitMap, deDuplicatedBitMap); + dataAdapter.setValue(columnIndex, reorderedValue); - if (tablet.getBitMaps() != null && tablet.getBitMaps()[columnIndex] != null) { - tablet.getBitMaps()[columnIndex] = deDuplicatedBitMap; + if (bitMaps != null && columnIndex < bitMaps.length && bitMaps[columnIndex] != null) { + bitMaps[columnIndex] = deDuplicatedBitMap; + bitMapsModified = true; } - columnIndex++; } } + + if (bitMapsModified) { + dataAdapter.setBitMaps(bitMaps); + } } protected Object reorderValueListAndBitMap( final Object valueList, final TSDataType dataType, + final int columnIndex, final BitMap originalBitMap, final BitMap deDuplicatedBitMap) { // Older version's sender may contain null values, we need to cover this case @@ -107,13 +138,26 @@ public class PipeTabletEventSorter { } return deDuplicatedIntValues; case DATE: - final LocalDate[] dateValues = (LocalDate[]) valueList; - final LocalDate[] deDuplicatedDateValues = new LocalDate[dateValues.length]; - for (int i = 0; i < deDuplicatedSize; i++) { - deDuplicatedDateValues[i] = - dateValues[getLastNonnullIndex(i, originalBitMap, deDuplicatedBitMap)]; + // DATE type: Tablet uses LocalDate[], InsertTabletStatement uses int[] + if (dataAdapter.isDateStoredAsLocalDate(columnIndex)) { + // Tablet: LocalDate[] + final LocalDate[] dateValues = (LocalDate[]) valueList; + final LocalDate[] deDuplicatedDateValues = new LocalDate[dateValues.length]; + for (int i = 0; i < deDuplicatedSize; i++) { + deDuplicatedDateValues[i] = + dateValues[getLastNonnullIndex(i, originalBitMap, deDuplicatedBitMap)]; + } + return deDuplicatedDateValues; + } else { + // InsertTabletStatement: int[] + final int[] intDateValues = (int[]) valueList; + final int[] deDuplicatedIntDateValues = new int[intDateValues.length]; + for (int i = 0; i < deDuplicatedSize; i++) { + deDuplicatedIntDateValues[i] = + intDateValues[getLastNonnullIndex(i, originalBitMap, deDuplicatedBitMap)]; + } + return deDuplicatedIntDateValues; } - return deDuplicatedDateValues; case INT64: case TIMESTAMP: final long[] longValues = (long[]) valueList; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTableModelTabletEventSorter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTableModelTabletEventSorter.java index 5735b51c6d0..ba034cae3a3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTableModelTabletEventSorter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTableModelTabletEventSorter.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.pipe.sink.util.sorter; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; + import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.utils.Pair; @@ -31,14 +33,29 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class PipeTableModelTabletEventSorter extends PipeTabletEventSorter { +public class PipeTableModelTabletEventSorter extends PipeInsertEventSorter { private int initIndexSize; + /** + * Constructor for Tablet. + * + * @param tablet the tablet to sort + */ public PipeTableModelTabletEventSorter(final Tablet tablet) { super(tablet); deDuplicatedSize = tablet == null ? 0 : tablet.getRowSize(); } + /** + * Constructor for InsertTabletStatement. + * + * @param statement the insert tablet statement to sort + */ + public PipeTableModelTabletEventSorter(final InsertTabletStatement statement) { + super(statement); + deDuplicatedSize = statement == null ? 0 : statement.getRowCount(); + } + /** * For the sorting and deduplication needs of the table model tablet, it is done according to the * {@link IDeviceID}. For sorting, it is necessary to sort the {@link IDeviceID} first, and then @@ -46,18 +63,19 @@ public class PipeTableModelTabletEventSorter extends PipeTabletEventSorter { * the same timestamp in different {@link IDeviceID} will not be processed. */ public void sortAndDeduplicateByDevIdTimestamp() { - if (tablet == null || tablet.getRowSize() < 1) { + if (dataAdapter == null || dataAdapter.getRowSize() < 1) { return; } final HashMap<IDeviceID, List<Pair<Integer, Integer>>> deviceIDToIndexMap = new HashMap<>(); - final long[] timestamps = tablet.getTimestamps(); + final long[] timestamps = dataAdapter.getTimestamps(); + final int rowSize = dataAdapter.getRowSize(); - IDeviceID lastDevice = tablet.getDeviceID(0); - long previousTimestamp = tablet.getTimestamp(0); + IDeviceID lastDevice = dataAdapter.getDeviceID(0); + long previousTimestamp = dataAdapter.getTimestamp(0); int lasIndex = 0; - for (int i = 1, size = tablet.getRowSize(); i < size; ++i) { - final IDeviceID deviceID = tablet.getDeviceID(i); + for (int i = 1; i < rowSize; ++i) { + final IDeviceID deviceID = dataAdapter.getDeviceID(i); final long currentTimestamp = timestamps[i]; final int deviceComparison = deviceID.compareTo(lastDevice); if (deviceComparison == 0) { @@ -92,7 +110,7 @@ public class PipeTableModelTabletEventSorter extends PipeTabletEventSorter { if (!list.isEmpty()) { isSorted = false; } - list.add(new Pair<>(lasIndex, tablet.getRowSize())); + list.add(new Pair<>(lasIndex, rowSize)); if (isSorted && isDeDuplicated) { return; @@ -100,8 +118,8 @@ public class PipeTableModelTabletEventSorter extends PipeTabletEventSorter { initIndexSize = 0; deDuplicatedSize = 0; - index = new Integer[tablet.getRowSize()]; - deDuplicatedIndex = new int[tablet.getRowSize()]; + index = new Integer[rowSize]; + deDuplicatedIndex = new int[rowSize]; deviceIDToIndexMap.entrySet().stream() .sorted(Map.Entry.comparingByKey()) .forEach( @@ -129,19 +147,22 @@ public class PipeTableModelTabletEventSorter extends PipeTabletEventSorter { } private void sortAndDeduplicateValuesAndBitMapsWithTimestamp() { - tablet.setTimestamps( + // TIMESTAMP is not a DATE type, so columnIndex is not relevant here, use -1 + dataAdapter.setTimestamps( (long[]) - reorderValueListAndBitMap(tablet.getTimestamps(), TSDataType.TIMESTAMP, null, null)); + reorderValueListAndBitMap( + dataAdapter.getTimestamps(), TSDataType.TIMESTAMP, -1, null, null)); sortAndMayDeduplicateValuesAndBitMaps(); - tablet.setRowSize(deDuplicatedSize); + dataAdapter.setRowSize(deDuplicatedSize); } private void sortTimestamps(final int startIndex, final int endIndex) { - Arrays.sort(this.index, startIndex, endIndex, Comparator.comparingLong(tablet::getTimestamp)); + Arrays.sort( + this.index, startIndex, endIndex, Comparator.comparingLong(dataAdapter::getTimestamp)); } private void deDuplicateTimestamps(final int startIndex, final int endIndex) { - final long[] timestamps = tablet.getTimestamps(); + final long[] timestamps = dataAdapter.getTimestamps(); long lastTime = timestamps[index[startIndex]]; for (int i = startIndex + 1; i < endIndex; i++) { if (lastTime != (lastTime = timestamps[index[i]])) { @@ -153,12 +174,13 @@ public class PipeTableModelTabletEventSorter extends PipeTabletEventSorter { /** Sort by time only. */ public void sortByTimestampIfNecessary() { - if (tablet == null || tablet.getRowSize() == 0) { + if (dataAdapter == null || dataAdapter.getRowSize() == 0) { return; } - final long[] timestamps = tablet.getTimestamps(); - for (int i = 1, size = tablet.getRowSize(); i < size; ++i) { + final long[] timestamps = dataAdapter.getTimestamps(); + final int rowSize = dataAdapter.getRowSize(); + for (int i = 1; i < rowSize; ++i) { final long currentTimestamp = timestamps[i]; final long previousTimestamp = timestamps[i - 1]; @@ -172,8 +194,8 @@ public class PipeTableModelTabletEventSorter extends PipeTabletEventSorter { return; } - index = new Integer[tablet.getRowSize()]; - for (int i = 0, size = tablet.getRowSize(); i < size; i++) { + index = new Integer[rowSize]; + for (int i = 0; i < rowSize; i++) { index[i] = i; } @@ -185,7 +207,8 @@ public class PipeTableModelTabletEventSorter extends PipeTabletEventSorter { } private void sortTimestamps() { - Arrays.sort(this.index, Comparator.comparingLong(tablet::getTimestamp)); - Arrays.sort(tablet.getTimestamps(), 0, tablet.getRowSize()); + Arrays.sort(this.index, Comparator.comparingLong(dataAdapter::getTimestamp)); + final long[] timestamps = dataAdapter.getTimestamps(); + Arrays.sort(timestamps, 0, dataAdapter.getRowSize()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTreeModelTabletEventSorter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTreeModelTabletEventSorter.java index c26f59220f9..2a56b706463 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTreeModelTabletEventSorter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTreeModelTabletEventSorter.java @@ -19,25 +19,43 @@ package org.apache.iotdb.db.pipe.sink.util.sorter; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; + import org.apache.tsfile.write.record.Tablet; import java.util.Arrays; import java.util.Comparator; -public class PipeTreeModelTabletEventSorter extends PipeTabletEventSorter { +public class PipeTreeModelTabletEventSorter extends PipeInsertEventSorter { + /** + * Constructor for Tablet. + * + * @param tablet the tablet to sort + */ public PipeTreeModelTabletEventSorter(final Tablet tablet) { super(tablet); deDuplicatedSize = tablet == null ? 0 : tablet.getRowSize(); } + /** + * Constructor for InsertTabletStatement. + * + * @param statement the insert tablet statement to sort + */ + public PipeTreeModelTabletEventSorter(final InsertTabletStatement statement) { + super(statement); + deDuplicatedSize = statement == null ? 0 : statement.getRowCount(); + } + public void deduplicateAndSortTimestampsIfNecessary() { - if (tablet == null || tablet.getRowSize() == 0) { + if (dataAdapter == null || dataAdapter.getRowSize() == 0) { return; } - long[] timestamps = tablet.getTimestamps(); - for (int i = 1, size = tablet.getRowSize(); i < size; ++i) { + long[] timestamps = dataAdapter.getTimestamps(); + final int rowSize = dataAdapter.getRowSize(); + for (int i = 1; i < rowSize; ++i) { final long currentTimestamp = timestamps[i]; final long previousTimestamp = timestamps[i - 1]; @@ -54,9 +72,9 @@ public class PipeTreeModelTabletEventSorter extends PipeTabletEventSorter { return; } - index = new Integer[tablet.getRowSize()]; - deDuplicatedIndex = new int[tablet.getRowSize()]; - for (int i = 0, size = tablet.getRowSize(); i < size; i++) { + index = new Integer[rowSize]; + deDuplicatedIndex = new int[rowSize]; + for (int i = 0; i < rowSize; i++) { index[i] = i; } @@ -78,14 +96,16 @@ public class PipeTreeModelTabletEventSorter extends PipeTabletEventSorter { private void sortTimestamps() { // Index is sorted stably because it is Integer[] - Arrays.sort(index, Comparator.comparingLong(tablet::getTimestamp)); - Arrays.sort(tablet.getTimestamps(), 0, tablet.getRowSize()); + Arrays.sort(index, Comparator.comparingLong(dataAdapter::getTimestamp)); + final long[] timestamps = dataAdapter.getTimestamps(); + Arrays.sort(timestamps, 0, dataAdapter.getRowSize()); } private void deduplicateTimestamps() { deDuplicatedSize = 0; - long[] timestamps = tablet.getTimestamps(); - for (int i = 1, size = tablet.getRowSize(); i < size; i++) { + long[] timestamps = dataAdapter.getTimestamps(); + final int rowSize = dataAdapter.getRowSize(); + for (int i = 1; i < rowSize; i++) { if (timestamps[i] != timestamps[i - 1]) { deDuplicatedIndex[deDuplicatedSize] = i - 1; timestamps[deDuplicatedSize] = timestamps[i - 1]; @@ -94,8 +114,8 @@ public class PipeTreeModelTabletEventSorter extends PipeTabletEventSorter { } } - deDuplicatedIndex[deDuplicatedSize] = tablet.getRowSize() - 1; - timestamps[deDuplicatedSize] = timestamps[tablet.getRowSize() - 1]; - tablet.setRowSize(++deDuplicatedSize); + deDuplicatedIndex[deDuplicatedSize] = rowSize - 1; + timestamps[deDuplicatedSize] = timestamps[rowSize - 1]; + dataAdapter.setRowSize(++deDuplicatedSize); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/TabletAdapter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/TabletAdapter.java new file mode 100644 index 00000000000..b200127a5d4 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/TabletAdapter.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.util.sorter; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; + +import java.util.List; + +/** Adapter for Tablet to implement InsertEventDataAdapter interface. */ +public class TabletAdapter implements InsertEventDataAdapter { + + private final Tablet tablet; + + public TabletAdapter(final Tablet tablet) { + this.tablet = tablet; + } + + @Override + public int getColumnCount() { + final Object[] values = tablet.getValues(); + return values != null ? values.length : 0; + } + + @Override + public TSDataType getDataType(int columnIndex) { + final List<IMeasurementSchema> schemas = tablet.getSchemas(); + if (schemas != null && columnIndex < schemas.size()) { + final IMeasurementSchema schema = schemas.get(columnIndex); + return schema != null ? schema.getType() : null; + } + return null; + } + + @Override + public BitMap[] getBitMaps() { + return tablet.getBitMaps(); + } + + @Override + public void setBitMaps(BitMap[] bitMaps) { + tablet.setBitMaps(bitMaps); + } + + @Override + public Object[] getValues() { + return tablet.getValues(); + } + + @Override + public void setValue(int columnIndex, Object value) { + tablet.getValues()[columnIndex] = value; + } + + @Override + public long[] getTimestamps() { + return tablet.getTimestamps(); + } + + @Override + public void setTimestamps(long[] timestamps) { + tablet.setTimestamps(timestamps); + } + + @Override + public int getRowSize() { + return tablet.getRowSize(); + } + + @Override + public void setRowSize(int rowSize) { + tablet.setRowSize(rowSize); + } + + @Override + public long getTimestamp(int rowIndex) { + return tablet.getTimestamp(rowIndex); + } + + @Override + public IDeviceID getDeviceID(int rowIndex) { + return tablet.getDeviceID(rowIndex); + } + + @Override + public boolean isDateStoredAsLocalDate(int columnIndex) { + return true; + } + + public Tablet getTablet() { + return tablet; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java index 1aae871ea0c..d8786e33959 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java @@ -839,6 +839,16 @@ public abstract class InsertBaseStatement extends Statement implements Accountab return ramBytesUsed; } + /** + * Set the pre-calculated memory size. This is used when memory size is calculated during + * deserialization to avoid recalculation. + * + * @param ramBytesUsed the calculated memory size in bytes + */ + public void setRamBytesUsed(long ramBytesUsed) { + this.ramBytesUsed = ramBytesUsed; + } + private long shallowSizeOfList(List<?> list) { return Objects.nonNull(list) ? UpdateDetailContainer.LIST_SIZE diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java index d2c1d2a783a..12369d81bfd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java @@ -44,6 +44,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.StatementType; import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; import org.apache.iotdb.db.utils.CommonUtils; +import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.IDeviceID.Factory; @@ -58,6 +59,8 @@ import org.apache.tsfile.write.UnSupportedDataTypeException; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.LocalDate; import java.util.ArrayList; @@ -69,11 +72,22 @@ import java.util.Map; import java.util.Objects; public class InsertTabletStatement extends InsertBaseStatement implements ISchemaValidation { + private static final Logger LOGGER = LoggerFactory.getLogger(InsertTabletStatement.class); + private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(InsertTabletStatement.class); private static final String DATATYPE_UNSUPPORTED = "Data type %s is not supported."; + /** + * Get the instance size of InsertTabletStatement for memory calculation. + * + * @return instance size in bytes + */ + public static long getInstanceSize() { + return INSTANCE_SIZE; + } + protected long[] times; // times should be sorted. It is done in the session API. protected BitMap[] nullBitMaps; protected Object[] columns; @@ -701,6 +715,189 @@ public class InsertTabletStatement extends InsertBaseStatement implements ISchem } } + /** + * Convert this InsertTabletStatement to Tablet. This method constructs a Tablet object from this + * statement, converting all necessary fields. All arrays are copied to rowSize length to ensure + * immutability. + * + * @return Tablet object + * @throws MetadataException if conversion fails + */ + public Tablet convertToTablet() throws MetadataException { + try { + // Get deviceId/tableName from devicePath + final String deviceIdOrTableName = + this.getDevicePath() != null ? this.getDevicePath().getFullPath() : ""; + + // Get schemas from measurementSchemas + final MeasurementSchema[] measurementSchemas = this.getMeasurementSchemas(); + final String[] measurements = this.getMeasurements(); + final TSDataType[] dataTypes = this.getDataTypes(); + // If measurements and dataTypes are not null, use measurements.length as the standard length + final int originalSchemaSize = measurements != null ? measurements.length : 0; + + // Build schemas and track valid column indices (skip null columns) + // measurements and dataTypes being null is standard - skip those columns + final List<IMeasurementSchema> schemas = new ArrayList<>(); + final List<Integer> validColumnIndices = new ArrayList<>(); + for (int i = 0; i < originalSchemaSize; i++) { + if (dataTypes != null && measurements[i] != null && dataTypes[i] != null) { + // Create MeasurementSchema if not present + schemas.add(new MeasurementSchema(measurements[i], dataTypes[i])); + validColumnIndices.add(i); + } + // Skip null columns - don't add to schemas or validColumnIndices + } + + final int schemaSize = schemas.size(); + + // Get columnTypes (for table model) - only for valid columns + final TsTableColumnCategory[] columnCategories = this.getColumnCategories(); + final List<ColumnCategory> tabletColumnTypes = new ArrayList<>(); + if (columnCategories != null && columnCategories.length > 0) { + for (final int validIndex : validColumnIndices) { + if (columnCategories[validIndex] != null) { + tabletColumnTypes.add(columnCategories[validIndex].toTsFileColumnType()); + } else { + tabletColumnTypes.add(ColumnCategory.FIELD); + } + } + } else { + // Default to FIELD for all valid columns if not specified + for (int i = 0; i < schemaSize; i++) { + tabletColumnTypes.add(ColumnCategory.FIELD); + } + } + + // Get timestamps - always copy to ensure immutability + final long[] times = this.getTimes(); + final int rowSize = this.getRowCount(); + final long[] timestamps; + if (times != null && times.length >= rowSize && rowSize > 0) { + timestamps = new long[rowSize]; + System.arraycopy(times, 0, timestamps, 0, rowSize); + } else { + LOGGER.warn( + "Times array is null or too small. times.length={}, rowSize={}, deviceId={}", + times != null ? times.length : 0, + rowSize, + deviceIdOrTableName); + timestamps = new long[0]; + } + + // Get values - convert Statement columns to Tablet format, only for valid columns + // All arrays are truncated/copied to rowSize length + final Object[] statementColumns = this.getColumns(); + final Object[] tabletValues = new Object[schemaSize]; + if (statementColumns != null && statementColumns.length > 0) { + for (int i = 0; i < validColumnIndices.size(); i++) { + final int originalIndex = validColumnIndices.get(i); + if (statementColumns[originalIndex] != null && dataTypes[originalIndex] != null) { + tabletValues[i] = + convertColumnToTablet( + statementColumns[originalIndex], dataTypes[originalIndex], rowSize); + } else { + tabletValues[i] = null; + } + } + } + + // Get bitMaps - copy and truncate to rowSize, only for valid columns + final BitMap[] originalBitMaps = this.getBitMaps(); + final BitMap[] bitMaps; + if (originalBitMaps != null && originalBitMaps.length > 0) { + bitMaps = new BitMap[schemaSize]; + for (int i = 0; i < validColumnIndices.size(); i++) { + final int originalIndex = validColumnIndices.get(i); + if (originalBitMaps[originalIndex] != null) { + // Create a new BitMap truncated to rowSize + final byte[] truncatedBytes = + originalBitMaps[originalIndex].getTruncatedByteArray(rowSize); + bitMaps[i] = new BitMap(rowSize, truncatedBytes); + } else { + bitMaps[i] = null; + } + } + } else { + bitMaps = null; + } + + // Create Tablet using the full constructor + // Tablet(String tableName, List<IMeasurementSchema> schemas, List<ColumnCategory> + // columnTypes, + // long[] timestamps, Object[] values, BitMap[] bitMaps, int rowSize) + return new Tablet( + deviceIdOrTableName, + schemas, + tabletColumnTypes, + timestamps, + tabletValues, + bitMaps, + rowSize); + } catch (final Exception e) { + throw new MetadataException("Failed to convert InsertTabletStatement to Tablet", e); + } + } + + /** + * Convert a single column value from Statement format to Tablet format. Statement uses primitive + * arrays (e.g., int[], long[], float[]), while Tablet may need different format. All arrays are + * copied and truncated to rowSize length to ensure immutability - even if the original array is + * modified, the converted array remains unchanged. + * + * @param columnValue column value from Statement (primitive array) + * @param dataType data type of the column + * @param rowSize number of rows to copy (truncate to this length) + * @return column value in Tablet format (copied and truncated array) + */ + private Object convertColumnToTablet( + final Object columnValue, final TSDataType dataType, final int rowSize) { + + if (columnValue == null) { + return null; + } + + if (TSDataType.DATE.equals(dataType)) { + final int[] values = (int[]) columnValue; + // Copy and truncate to rowSize + final int[] copiedValues = Arrays.copyOf(values, Math.min(values.length, rowSize)); + final LocalDate[] localDateValue = new LocalDate[rowSize]; + for (int i = 0; i < copiedValues.length; i++) { + localDateValue[i] = DateUtils.parseIntToLocalDate(copiedValues[i]); + } + // Fill remaining with null if needed + for (int i = copiedValues.length; i < rowSize; i++) { + localDateValue[i] = null; + } + return localDateValue; + } + + // For primitive arrays, copy and truncate to rowSize + if (columnValue instanceof boolean[]) { + final boolean[] original = (boolean[]) columnValue; + return Arrays.copyOf(original, Math.min(original.length, rowSize)); + } else if (columnValue instanceof int[]) { + final int[] original = (int[]) columnValue; + return Arrays.copyOf(original, Math.min(original.length, rowSize)); + } else if (columnValue instanceof long[]) { + final long[] original = (long[]) columnValue; + return Arrays.copyOf(original, Math.min(original.length, rowSize)); + } else if (columnValue instanceof float[]) { + final float[] original = (float[]) columnValue; + return Arrays.copyOf(original, Math.min(original.length, rowSize)); + } else if (columnValue instanceof double[]) { + final double[] original = (double[]) columnValue; + return Arrays.copyOf(original, Math.min(original.length, rowSize)); + } else if (columnValue instanceof Binary[]) { + // For Binary arrays, create a new array and copy references, truncate to rowSize + final Binary[] original = (Binary[]) columnValue; + return Arrays.copyOf(original, Math.min(original.length, rowSize)); + } + + // For other types, return as-is (should not happen for standard types) + return columnValue; + } + @Override public String toString() { final int size = CommonDescriptor.getInstance().getConfig().getPathLogMaxSize(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java index 1a122c63d4f..0cc4470882e 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java @@ -437,7 +437,7 @@ public class PipeDataNodeThriftRequestTest { try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { t.serialize(outputStream); - ReadWriteIOUtils.write(false, outputStream); + ReadWriteIOUtils.write(true, outputStream); tabletBuffers.add( ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size())); tabletDataBase.add("test"); @@ -459,7 +459,7 @@ public class PipeDataNodeThriftRequestTest { new byte[] {'a', 'b'}, deserializedReq.getBinaryReqs().get(0).getByteBuffer().array()); Assert.assertEquals(node, deserializedReq.getInsertNodeReqs().get(0).getInsertNode()); Assert.assertEquals(t, deserializedReq.getTabletReqs().get(0).getTablet()); - Assert.assertFalse(deserializedReq.getTabletReqs().get(0).getIsAligned()); + Assert.assertTrue(deserializedReq.getTabletReqs().get(0).getIsAligned()); Assert.assertEquals("test", deserializedReq.getBinaryReqs().get(0).getDataBaseName()); Assert.assertEquals("test", deserializedReq.getTabletReqs().get(0).getDataBaseName()); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeStatementEventSorterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeStatementEventSorterTest.java new file mode 100644 index 00000000000..7c13d9764b3 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeStatementEventSorterTest.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink; + +import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter; +import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class PipeStatementEventSorterTest { + + @Test + public void testTreeModelDeduplicateAndSort() throws Exception { + List<IMeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s3", TSDataType.INT64)); + + Tablet tablet = new Tablet("root.sg.device", schemaList, 30); + + long timestamp = 300; + for (long i = 0; i < 10; i++) { + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, timestamp + i); + for (int s = 0; s < 3; s++) { + tablet.addValue(schemaList.get(s).getMeasurementName(), rowIndex, timestamp + i); + } + + rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, timestamp - i); + for (int s = 0; s < 3; s++) { + tablet.addValue(schemaList.get(s).getMeasurementName(), rowIndex, timestamp - i); + } + + rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, timestamp); + for (int s = 0; s < 3; s++) { + tablet.addValue(schemaList.get(s).getMeasurementName(), rowIndex, timestamp); + } + } + + Set<Integer> indices = new HashSet<>(); + for (int i = 0; i < 30; i++) { + indices.add((int) tablet.getTimestamp(i)); + } + + Assert.assertFalse(tablet.isSorted()); + + // Convert Tablet to Statement + InsertTabletStatement statement = new InsertTabletStatement(tablet, true, null); + + // Sort using Statement + new PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary(); + + Assert.assertEquals(indices.size(), statement.getRowCount()); + + final long[] timestamps = Arrays.copyOfRange(statement.getTimes(), 0, statement.getRowCount()); + final Object[] columns = statement.getColumns(); + for (int i = 0; i < 3; ++i) { + Assert.assertArrayEquals( + timestamps, Arrays.copyOfRange((long[]) columns[i], 0, statement.getRowCount())); + } + + for (int i = 1; i < statement.getRowCount(); ++i) { + Assert.assertTrue(timestamps[i] > timestamps[i - 1]); + for (int j = 0; j < 3; ++j) { + Assert.assertTrue(((long[]) columns[j])[i] > ((long[]) columns[j])[i - 1]); + } + } + } + + @Test + public void testTreeModelDeduplicate() throws Exception { + final List<IMeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s3", TSDataType.INT64)); + + final Tablet tablet = new Tablet("root.sg.device", schemaList, 10); + + final long timestamp = 300; + for (long i = 0; i < 10; i++) { + final int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, timestamp); + for (int s = 0; s < 3; s++) { + tablet.addValue( + schemaList.get(s).getMeasurementName(), + rowIndex, + (i + s) % 3 != 0 ? timestamp + i : null); + } + } + + final Set<Integer> indices = new HashSet<>(); + for (int i = 0; i < 10; i++) { + indices.add((int) tablet.getTimestamp(i)); + } + + Assert.assertTrue(tablet.isSorted()); + + // Convert Tablet to Statement + InsertTabletStatement statement = new InsertTabletStatement(tablet, true, null); + + // Sort using Statement + new PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary(); + + Assert.assertEquals(indices.size(), statement.getRowCount()); + + final long[] timestamps = Arrays.copyOfRange(statement.getTimes(), 0, statement.getRowCount()); + final Object[] columns = statement.getColumns(); + Assert.assertEquals(timestamps[0] + 8, ((long[]) columns[0])[0]); + for (int i = 1; i < 3; ++i) { + Assert.assertEquals(timestamps[0] + 9, ((long[]) columns[i])[0]); + } + } + + @Test + public void testTreeModelSort() throws Exception { + List<IMeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s3", TSDataType.INT64)); + + Tablet tablet = new Tablet("root.sg.device", schemaList, 30); + + for (long i = 0; i < 10; i++) { + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, (long) rowIndex + 2); + for (int s = 0; s < 3; s++) { + tablet.addValue(schemaList.get(s).getMeasurementName(), rowIndex, (long) rowIndex + 2); + } + + rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, rowIndex); + for (int s = 0; s < 3; s++) { + tablet.addValue(schemaList.get(s).getMeasurementName(), rowIndex, (long) rowIndex); + } + + rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, (long) rowIndex - 2); + for (int s = 0; s < 3; s++) { + tablet.addValue(schemaList.get(s).getMeasurementName(), rowIndex, (long) rowIndex - 2); + } + } + + Set<Integer> indices = new HashSet<>(); + for (int i = 0; i < 30; i++) { + indices.add((int) tablet.getTimestamp(i)); + } + + Assert.assertFalse(tablet.isSorted()); + + long[] timestamps = Arrays.copyOfRange(tablet.getTimestamps(), 0, tablet.getRowSize()); + for (int i = 0; i < 3; ++i) { + Assert.assertArrayEquals( + timestamps, Arrays.copyOfRange((long[]) tablet.getValues()[i], 0, tablet.getRowSize())); + } + + for (int i = 1; i < tablet.getRowSize(); ++i) { + Assert.assertTrue(timestamps[i] != timestamps[i - 1]); + for (int j = 0; j < 3; ++j) { + Assert.assertNotEquals((long) tablet.getValue(i, j), (long) tablet.getValue(i - 1, j)); + } + } + + // Convert Tablet to Statement + InsertTabletStatement statement = new InsertTabletStatement(tablet, true, null); + + // Sort using Statement + new PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary(); + + Assert.assertEquals(indices.size(), statement.getRowCount()); + + timestamps = Arrays.copyOfRange(statement.getTimes(), 0, statement.getRowCount()); + final Object[] columns = statement.getColumns(); + for (int i = 0; i < 3; ++i) { + Assert.assertArrayEquals( + timestamps, Arrays.copyOfRange((long[]) columns[i], 0, statement.getRowCount())); + } + + for (int i = 1; i < statement.getRowCount(); ++i) { + Assert.assertTrue(timestamps[i] > timestamps[i - 1]); + for (int j = 0; j < 3; ++j) { + Assert.assertTrue(((long[]) columns[j])[i] > ((long[]) columns[j])[i - 1]); + } + } + } + + @Test + public void testTableModelDeduplicateAndSort() throws Exception { + doTableModelTest(true, true); + } + + @Test + public void testTableModelDeduplicate() throws Exception { + doTableModelTest(true, false); + } + + @Test + public void testTableModelSort() throws Exception { + doTableModelTest(false, true); + } + + @Test + public void testTableModelSort1() throws Exception { + doTableModelTest1(); + } + + public void doTableModelTest(final boolean hasDuplicates, final boolean isUnSorted) + throws Exception { + final Tablet tablet = + PipeTabletEventSorterTest.generateTablet("test", 10, hasDuplicates, isUnSorted); + + // Convert Tablet to Statement + InsertTabletStatement statement = new InsertTabletStatement(tablet, false, "test_db"); + + // Sort using Statement + new PipeTableModelTabletEventSorter(statement).sortAndDeduplicateByDevIdTimestamp(); + + long[] timestamps = statement.getTimes(); + final Object[] columns = statement.getColumns(); + for (int i = 1; i < statement.getRowCount(); i++) { + long time = timestamps[i]; + Assert.assertTrue(time > timestamps[i - 1]); + Assert.assertEquals( + ((Binary[]) columns[0])[i], + new Binary(String.valueOf(i / 100).getBytes(StandardCharsets.UTF_8))); + Assert.assertEquals(((long[]) columns[1])[i], (long) i); + Assert.assertEquals(((float[]) columns[2])[i], i * 1.0f, 0.001f); + Assert.assertEquals( + ((Binary[]) columns[3])[i], + new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8))); + Assert.assertEquals(((long[]) columns[4])[i], (long) i); + Assert.assertEquals(((int[]) columns[5])[i], i); + Assert.assertEquals(((double[]) columns[6])[i], i * 0.1, 0.0001); + // DATE is stored as int[] in Statement, not LocalDate[] + LocalDate expectedDate = PipeTabletEventSorterTest.getDate(i); + int expectedDateInt = + org.apache.tsfile.utils.DateUtils.parseDateExpressionToInt(expectedDate); + Assert.assertEquals(((int[]) columns[7])[i], expectedDateInt); + Assert.assertEquals( + ((Binary[]) columns[8])[i], + new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8))); + } + } + + public void doTableModelTest1() throws Exception { + final Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, false, true); + + // Convert Tablet to Statement + InsertTabletStatement statement = new InsertTabletStatement(tablet, false, "test_db"); + + // Sort using Statement + new PipeTableModelTabletEventSorter(statement).sortByTimestampIfNecessary(); + + long[] timestamps = statement.getTimes(); + final Object[] columns = statement.getColumns(); + for (int i = 1; i < statement.getRowCount(); i++) { + long time = timestamps[i]; + Assert.assertTrue(time > timestamps[i - 1]); + Assert.assertEquals( + ((Binary[]) columns[0])[i], + new Binary(String.valueOf(i / 100).getBytes(StandardCharsets.UTF_8))); + Assert.assertEquals(((long[]) columns[1])[i], (long) i); + Assert.assertEquals(((float[]) columns[2])[i], i * 1.0f, 0.001f); + Assert.assertEquals( + ((Binary[]) columns[3])[i], + new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8))); + Assert.assertEquals(((long[]) columns[4])[i], (long) i); + Assert.assertEquals(((int[]) columns[5])[i], i); + Assert.assertEquals(((double[]) columns[6])[i], i * 0.1, 0.0001); + // DATE is stored as int[] in Statement, not LocalDate[] + LocalDate expectedDate = PipeTabletEventSorterTest.getDate(i); + int expectedDateInt = + org.apache.tsfile.utils.DateUtils.parseDateExpressionToInt(expectedDate); + Assert.assertEquals(((int[]) columns[7])[i], expectedDateInt); + Assert.assertEquals( + ((Binary[]) columns[8])[i], + new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8))); + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverterTest.java new file mode 100644 index 00000000000..410afc76130 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverterTest.java @@ -0,0 +1,607 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under this License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.util; + +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; + +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.DateUtils; +import org.apache.tsfile.utils.PublicBAOS; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.Assert; +import org.junit.Test; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.List; + +public class TabletStatementConverterTest { + + @Test + public void testConvertStatementToTabletTreeModel() throws MetadataException { + final int columnCount = 1000; + final int rowCount = 100; + final String deviceName = "root.sg.device"; + final boolean isAligned = true; + + // Generate Tablet and construct Statement from it + final Tablet originalTablet = generateTreeModelTablet(deviceName, columnCount, rowCount); + final InsertTabletStatement statement = + new InsertTabletStatement(originalTablet, isAligned, null); + + // Convert Statement to Tablet + final Tablet convertedTablet = statement.convertToTablet(); + + // Verify conversion + assertTabletsEqual(originalTablet, convertedTablet); + } + + @Test + public void testConvertStatementToTabletTableModel() throws MetadataException { + final int columnCount = 1000; + final int rowCount = 100; + final String tableName = "table1"; + final String databaseName = "test_db"; + final boolean isAligned = false; + + // Generate Tablet and construct Statement from it + final Tablet originalTablet = generateTableModelTablet(tableName, columnCount, rowCount); + final InsertTabletStatement statement = + new InsertTabletStatement(originalTablet, isAligned, databaseName); + + // Convert Statement to Tablet + final Tablet convertedTablet = statement.convertToTablet(); + + // Verify conversion + assertTabletsEqual(originalTablet, convertedTablet); + } + + @Test + public void testDeserializeStatementFromTabletFormat() throws IOException, MetadataException { + final int columnCount = 1000; + final int rowCount = 100; + final String deviceName = "root.sg.device"; + + // Generate test Tablet + final Tablet originalTablet = generateTreeModelTablet(deviceName, columnCount, rowCount); + + // Serialize Tablet to ByteBuffer + final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); + // Then serialize the tablet + originalTablet.serialize(outputStream); + // Write isAligned at the end + final boolean isAligned = true; + ReadWriteIOUtils.write(isAligned, outputStream); + + final ByteBuffer buffer = + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + + // Deserialize Statement from Tablet format + final InsertTabletStatement statement = + TabletStatementConverter.deserializeStatementFromTabletFormat(buffer); + + // Verify basic information + Assert.assertEquals(deviceName, statement.getDevicePath().getFullPath()); + Assert.assertEquals(rowCount, statement.getRowCount()); + Assert.assertEquals(columnCount, statement.getMeasurements().length); + Assert.assertEquals(isAligned, statement.isAligned()); + + // Verify data by converting Statement back to Tablet + final Tablet convertedTablet = statement.convertToTablet(); + assertTabletsEqual(originalTablet, convertedTablet); + } + + @Test + public void testRoundTripConversionTreeModel() throws MetadataException, IOException { + final int columnCount = 1000; + final int rowCount = 100; + final String deviceName = "root.sg.device"; + + // Generate original Tablet + final Tablet originalTablet = generateTreeModelTablet(deviceName, columnCount, rowCount); + + // Serialize Tablet to ByteBuffer + final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); + originalTablet.serialize(outputStream); + final boolean isAligned = true; + ReadWriteIOUtils.write(isAligned, outputStream); + final ByteBuffer buffer = + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + + // Deserialize to Statement + final InsertTabletStatement statement = + TabletStatementConverter.deserializeStatementFromTabletFormat(buffer); + // Convert Statement back to Tablet + final Tablet convertedTablet = statement.convertToTablet(); + + // Verify round trip + assertTabletsEqual(originalTablet, convertedTablet); + } + + @Test + public void testRoundTripConversionTableModel() throws MetadataException { + final int columnCount = 1000; + final int rowCount = 100; + final String tableName = "table1"; + final String databaseName = "test_db"; + final boolean isAligned = false; + + // Generate original Tablet for table model + final Tablet originalTablet = generateTableModelTablet(tableName, columnCount, rowCount); + + // Construct Statement from Tablet + final InsertTabletStatement originalStatement = + new InsertTabletStatement(originalTablet, isAligned, databaseName); + + // Convert Statement to Tablet + final Tablet convertedTablet = originalStatement.convertToTablet(); + + // Convert Tablet back to Statement + final InsertTabletStatement convertedStatement = + new InsertTabletStatement(convertedTablet, isAligned, databaseName); + + // Verify round trip: original Tablet should equal converted Tablet + assertTabletsEqual(originalTablet, convertedTablet); + } + + /** + * Generate a Tablet for tree model with all data types and specified number of columns and rows. + * + * @param deviceName device name + * @param columnCount number of columns + * @param rowCount number of rows + * @return Tablet with test data + */ + private Tablet generateTreeModelTablet( + final String deviceName, final int columnCount, final int rowCount) { + final List<IMeasurementSchema> schemaList = new ArrayList<>(); + final TSDataType[] dataTypes = new TSDataType[columnCount]; + final String[] measurementNames = new String[columnCount]; + final Object[] columnData = new Object[columnCount]; + + // Create schemas and generate test data + for (int col = 0; col < columnCount; col++) { + final TSDataType dataType = ALL_DATA_TYPES[col % ALL_DATA_TYPES.length]; + final String measurementName = "col_" + col + "_" + dataType.name(); + schemaList.add(new MeasurementSchema(measurementName, dataType)); + dataTypes[col] = dataType; + measurementNames[col] = measurementName; + + // Generate test data for this column + switch (dataType) { + case BOOLEAN: + final boolean[] boolValues = new boolean[rowCount]; + for (int row = 0; row < rowCount; row++) { + boolValues[row] = (row + col) % 2 == 0; + } + columnData[col] = boolValues; + break; + case INT32: + final int[] intValues = new int[rowCount]; + for (int row = 0; row < rowCount; row++) { + intValues[row] = row * 100 + col; + } + columnData[col] = intValues; + break; + case DATE: + final LocalDate[] dateValues = new LocalDate[rowCount]; + for (int row = 0; row < rowCount; row++) { + // Generate valid dates starting from 2024-01-01 + dateValues[row] = LocalDate.of(2024, 1, 1).plusDays((row + col) % 365); + } + columnData[col] = dateValues; + break; + case INT64: + case TIMESTAMP: + final long[] longValues = new long[rowCount]; + for (int row = 0; row < rowCount; row++) { + longValues[row] = (long) row * 1000L + col; + } + columnData[col] = longValues; + break; + case FLOAT: + final float[] floatValues = new float[rowCount]; + for (int row = 0; row < rowCount; row++) { + floatValues[row] = row * 1.5f + col * 0.1f; + } + columnData[col] = floatValues; + break; + case DOUBLE: + final double[] doubleValues = new double[rowCount]; + for (int row = 0; row < rowCount; row++) { + doubleValues[row] = row * 2.5 + col * 0.01; + } + columnData[col] = doubleValues; + break; + case TEXT: + case STRING: + case BLOB: + final Binary[] binaryValues = new Binary[rowCount]; + for (int row = 0; row < rowCount; row++) { + binaryValues[row] = new Binary(("value_row_" + row + "_col_" + col).getBytes()); + } + columnData[col] = binaryValues; + break; + default: + throw new IllegalArgumentException("Unsupported data type: " + dataType); + } + } + + // Create and fill tablet + final Tablet tablet = new Tablet(deviceName, schemaList, rowCount); + final long[] times = new long[rowCount]; + for (int row = 0; row < rowCount; row++) { + times[row] = row * 1000L; + final int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, times[row]); + for (int col = 0; col < columnCount; col++) { + final TSDataType dataType = dataTypes[col]; + final Object data = columnData[col]; + switch (dataType) { + case BOOLEAN: + tablet.addValue(measurementNames[col], rowIndex, ((boolean[]) data)[row]); + break; + case INT32: + tablet.addValue(measurementNames[col], rowIndex, ((int[]) data)[row]); + break; + case DATE: + tablet.addValue(measurementNames[col], rowIndex, ((LocalDate[]) data)[row]); + break; + case INT64: + case TIMESTAMP: + tablet.addValue(measurementNames[col], rowIndex, ((long[]) data)[row]); + break; + case FLOAT: + tablet.addValue(measurementNames[col], rowIndex, ((float[]) data)[row]); + break; + case DOUBLE: + tablet.addValue(measurementNames[col], rowIndex, ((double[]) data)[row]); + break; + case TEXT: + case STRING: + case BLOB: + tablet.addValue(measurementNames[col], rowIndex, ((Binary[]) data)[row]); + break; + } + } + } + + return tablet; + } + + /** + * Generate a Tablet for table model with all data types and specified number of columns and rows. + * + * @param tableName table name + * @param columnCount number of columns + * @param rowCount number of rows + * @return Tablet with test data + */ + private Tablet generateTableModelTablet( + final String tableName, final int columnCount, final int rowCount) { + final List<IMeasurementSchema> schemaList = new ArrayList<>(); + final TSDataType[] dataTypes = new TSDataType[columnCount]; + final String[] measurementNames = new String[columnCount]; + final List<ColumnCategory> columnTypes = new ArrayList<>(); + final Object[] columnData = new Object[columnCount]; + + // Create schemas and generate test data + for (int col = 0; col < columnCount; col++) { + final TSDataType dataType = ALL_DATA_TYPES[col % ALL_DATA_TYPES.length]; + final String measurementName = "col_" + col + "_" + dataType.name(); + schemaList.add(new MeasurementSchema(measurementName, dataType)); + dataTypes[col] = dataType; + measurementNames[col] = measurementName; + // For table model, all columns are FIELD (can be TAG/ATTRIBUTE/FIELD, but we use FIELD for + // simplicity) + columnTypes.add(ColumnCategory.FIELD); + + // Generate test data for this column + switch (dataType) { + case BOOLEAN: + final boolean[] boolValues = new boolean[rowCount]; + for (int row = 0; row < rowCount; row++) { + boolValues[row] = (row + col) % 2 == 0; + } + columnData[col] = boolValues; + break; + case INT32: + final int[] intValues = new int[rowCount]; + for (int row = 0; row < rowCount; row++) { + intValues[row] = row * 100 + col; + } + columnData[col] = intValues; + break; + case DATE: + final LocalDate[] dateValues = new LocalDate[rowCount]; + for (int row = 0; row < rowCount; row++) { + // Generate valid dates starting from 2024-01-01 + dateValues[row] = LocalDate.of(2024, 1, 1).plusDays((row + col) % 365); + } + columnData[col] = dateValues; + break; + case INT64: + case TIMESTAMP: + final long[] longValues = new long[rowCount]; + for (int row = 0; row < rowCount; row++) { + longValues[row] = (long) row * 1000L + col; + } + columnData[col] = longValues; + break; + case FLOAT: + final float[] floatValues = new float[rowCount]; + for (int row = 0; row < rowCount; row++) { + floatValues[row] = row * 1.5f + col * 0.1f; + } + columnData[col] = floatValues; + break; + case DOUBLE: + final double[] doubleValues = new double[rowCount]; + for (int row = 0; row < rowCount; row++) { + doubleValues[row] = row * 2.5 + col * 0.01; + } + columnData[col] = doubleValues; + break; + case TEXT: + case STRING: + case BLOB: + final Binary[] binaryValues = new Binary[rowCount]; + for (int row = 0; row < rowCount; row++) { + binaryValues[row] = new Binary(("value_row_" + row + "_col_" + col).getBytes()); + } + columnData[col] = binaryValues; + break; + default: + throw new IllegalArgumentException("Unsupported data type: " + dataType); + } + } + + // Create tablet using table model constructor: Tablet(String, List<String>, List<TSDataType>, + // List<ColumnCategory>, int) + final List<String> measurementNameList = IMeasurementSchema.getMeasurementNameList(schemaList); + final List<TSDataType> dataTypeList = IMeasurementSchema.getDataTypeList(schemaList); + final Tablet tablet = + new Tablet(tableName, measurementNameList, dataTypeList, columnTypes, rowCount); + tablet.initBitMaps(); + + // Fill tablet with data + final long[] times = new long[rowCount]; + for (int row = 0; row < rowCount; row++) { + times[row] = row * 1000L; + final int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, times[row]); + for (int col = 0; col < columnCount; col++) { + final TSDataType dataType = dataTypes[col]; + final Object data = columnData[col]; + switch (dataType) { + case BOOLEAN: + tablet.addValue(measurementNames[col], rowIndex, ((boolean[]) data)[row]); + break; + case INT32: + tablet.addValue(measurementNames[col], rowIndex, ((int[]) data)[row]); + break; + case DATE: + tablet.addValue(measurementNames[col], rowIndex, ((LocalDate[]) data)[row]); + break; + case INT64: + case TIMESTAMP: + tablet.addValue(measurementNames[col], rowIndex, ((long[]) data)[row]); + break; + case FLOAT: + tablet.addValue(measurementNames[col], rowIndex, ((float[]) data)[row]); + break; + case DOUBLE: + tablet.addValue(measurementNames[col], rowIndex, ((double[]) data)[row]); + break; + case TEXT: + case STRING: + case BLOB: + tablet.addValue(measurementNames[col], rowIndex, ((Binary[]) data)[row]); + break; + } + } + tablet.setRowSize(rowIndex + 1); + } + + return tablet; + } + + /** + * Check if two Tablets are equal in all aspects. + * + * @param expected expected Tablet + * @param actual actual Tablet + */ + private void assertTabletsEqual(final Tablet expected, final Tablet actual) { + Assert.assertEquals(expected.getDeviceId(), actual.getDeviceId()); + Assert.assertEquals(expected.getRowSize(), actual.getRowSize()); + Assert.assertEquals(expected.getSchemas().size(), actual.getSchemas().size()); + + // Verify timestamps + final long[] expectedTimes = expected.getTimestamps(); + final long[] actualTimes = actual.getTimestamps(); + Assert.assertArrayEquals(expectedTimes, actualTimes); + + // Verify each column + final int columnCount = expected.getSchemas().size(); + final int rowCount = expected.getRowSize(); + final Object[] expectedValues = expected.getValues(); + final Object[] actualValues = actual.getValues(); + + for (int col = 0; col < columnCount; col++) { + final IMeasurementSchema schema = expected.getSchemas().get(col); + final TSDataType dataType = schema.getType(); + final Object expectedColumn = expectedValues[col]; + final Object actualColumn = actualValues[col]; + + Assert.assertNotNull(actualColumn); + + // Verify each row in this column + for (int row = 0; row < rowCount; row++) { + switch (dataType) { + case BOOLEAN: + final boolean expectedBool = ((boolean[]) expectedColumn)[row]; + final boolean actualBool = ((boolean[]) actualColumn)[row]; + Assert.assertEquals(expectedBool, actualBool); + break; + case INT32: + final int expectedInt = ((int[]) expectedColumn)[row]; + final int actualInt = ((int[]) actualColumn)[row]; + Assert.assertEquals(expectedInt, actualInt); + break; + case DATE: + final LocalDate expectedDate = ((LocalDate[]) expectedColumn)[row]; + final LocalDate actualDate = ((LocalDate[]) actualColumn)[row]; + Assert.assertEquals(expectedDate, actualDate); + break; + case INT64: + case TIMESTAMP: + final long expectedLong = ((long[]) expectedColumn)[row]; + final long actualLong = ((long[]) actualColumn)[row]; + Assert.assertEquals(expectedLong, actualLong); + break; + case FLOAT: + final float expectedFloat = ((float[]) expectedColumn)[row]; + final float actualFloat = ((float[]) actualColumn)[row]; + Assert.assertEquals(expectedFloat, actualFloat, 0.0001f); + break; + case DOUBLE: + final double expectedDouble = ((double[]) expectedColumn)[row]; + final double actualDouble = ((double[]) actualColumn)[row]; + Assert.assertEquals(expectedDouble, actualDouble, 0.0001); + break; + case TEXT: + case STRING: + case BLOB: + final Binary expectedBinary = ((Binary[]) expectedColumn)[row]; + final Binary actualBinary = ((Binary[]) actualColumn)[row]; + Assert.assertNotNull(actualBinary); + Assert.assertEquals(expectedBinary, actualBinary); + break; + } + } + } + } + + /** + * Check if a Tablet and an InsertTabletStatement contain the same data. + * + * @param tablet Tablet + * @param statement InsertTabletStatement + */ + private void assertTabletAndStatementEqual( + final Tablet tablet, final InsertTabletStatement statement) { + Assert.assertEquals( + tablet.getDeviceId(), + statement.getDevicePath() != null ? statement.getDevicePath().getFullPath() : null); + Assert.assertEquals(tablet.getRowSize(), statement.getRowCount()); + Assert.assertEquals(tablet.getSchemas().size(), statement.getMeasurements().length); + + // Verify timestamps + Assert.assertArrayEquals(tablet.getTimestamps(), statement.getTimes()); + + // Verify each column + final int columnCount = tablet.getSchemas().size(); + final int rowCount = tablet.getRowSize(); + final Object[] tabletValues = tablet.getValues(); + final Object[] statementColumns = statement.getColumns(); + + for (int col = 0; col < columnCount; col++) { + final TSDataType dataType = tablet.getSchemas().get(col).getType(); + final Object tabletColumn = tabletValues[col]; + final Object statementColumn = statementColumns[col]; + + Assert.assertNotNull(statementColumn); + + // Verify each row + for (int row = 0; row < rowCount; row++) { + switch (dataType) { + case BOOLEAN: + final boolean tabletBool = ((boolean[]) tabletColumn)[row]; + final boolean statementBool = ((boolean[]) statementColumn)[row]; + Assert.assertEquals(tabletBool, statementBool); + break; + case INT32: + final int tabletInt = ((int[]) tabletColumn)[row]; + final int statementInt = ((int[]) statementColumn)[row]; + Assert.assertEquals(tabletInt, statementInt); + break; + case DATE: + // DATE type: Tablet uses LocalDate[], Statement uses int[] (YYYYMMDD format) + final LocalDate tabletDate = ((LocalDate[]) tabletColumn)[row]; + final int statementDateInt = ((int[]) statementColumn)[row]; + // Convert LocalDate to int (YYYYMMDD format) for comparison + final int tabletDateInt = DateUtils.parseDateExpressionToInt(tabletDate); + Assert.assertEquals(tabletDateInt, statementDateInt); + break; + case INT64: + case TIMESTAMP: + final long tabletLong = ((long[]) tabletColumn)[row]; + final long statementLong = ((long[]) statementColumn)[row]; + Assert.assertEquals(tabletLong, statementLong); + break; + case FLOAT: + final float tabletFloat = ((float[]) tabletColumn)[row]; + final float statementFloat = ((float[]) statementColumn)[row]; + Assert.assertEquals(tabletFloat, statementFloat, 0.0001f); + break; + case DOUBLE: + final double tabletDouble = ((double[]) tabletColumn)[row]; + final double statementDouble = ((double[]) statementColumn)[row]; + Assert.assertEquals(tabletDouble, statementDouble, 0.0001); + break; + case TEXT: + case STRING: + case BLOB: + final Binary tabletBinary = ((Binary[]) tabletColumn)[row]; + final Binary statementBinary = ((Binary[]) statementColumn)[row]; + Assert.assertNotNull(statementBinary); + Assert.assertEquals(tabletBinary, statementBinary); + break; + } + } + } + } + + // Define all supported data types + private static final TSDataType[] ALL_DATA_TYPES = { + TSDataType.BOOLEAN, + TSDataType.INT32, + TSDataType.INT64, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT, + TSDataType.TIMESTAMP, + TSDataType.DATE, + TSDataType.BLOB, + TSDataType.STRING + }; +}
