This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch use_pam_for_insert_tablet
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/use_pam_for_insert_tablet by
this push:
new c065a81e7dd add pam support for insert tablet
c065a81e7dd is described below
commit c065a81e7dd310eca39fbc8d9e7f222be704a8e8
Author: Tian Jiang <[email protected]>
AuthorDate: Mon Mar 31 10:07:25 2025 +0800
add pam support for insert tablet
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 4 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 16 +-
.../dataregion/DataExecutionVisitor.java | 4 +-
.../tablet/parser/TabletInsertionEventParser.java | 154 +++++++-
.../realtime/assigner/PipeDataRegionAssigner.java | 3 +-
.../PipeConvertedInsertTabletStatement.java | 4 +-
.../resource/memory/InsertNodeMemoryEstimator.java | 134 +++----
.../iotdb/db/protocol/mqtt/MPPPublishHandler.java | 3 +-
.../v1/handler/StatementConstructionHandler.java | 3 +-
.../v1/handler/StatementConstructionHandler.java | 3 +-
.../v2/handler/StatementConstructionHandler.java | 3 +-
.../operator/process/AbstractIntoOperator.java | 6 +-
.../plan/parser/StatementGenerator.java | 54 ++-
.../planner/plan/node/write/InsertTabletNode.java | 40 ++-
.../node/write/RelationalInsertTabletNode.java | 2 +-
.../plan/relational/sql/ast/InsertTablet.java | 3 +-
.../plan/statement/crud/InsertBaseStatement.java | 3 +-
.../plan/statement/crud/InsertRowStatement.java | 7 +-
.../plan/statement/crud/InsertTabletStatement.java | 390 +++++++++++++++++----
.../db/storageengine/dataregion/DataRegion.java | 4 +-
.../memtable/AbstractWritableMemChunk.java | 16 +-
.../memtable/AlignedWritableMemChunk.java | 67 +---
.../dataregion/memtable/IWritableMemChunk.java | 16 +-
.../dataregion/memtable/WritableMemChunk.java | 20 +-
.../dataregion/wal/buffer/WALBuffer.java | 1 -
.../storageengine/dataregion/wal/node/WALNode.java | 4 +-
.../wal/recover/file/TsFilePlanRedoer.java | 4 +-
.../LoadConvertedInsertTabletStatement.java | 4 +-
.../db/trigger/executor/TriggerFireVisitor.java | 4 +-
.../org/apache/iotdb/db/utils/CommonUtils.java | 4 +-
.../java/org/apache/iotdb/db/utils/MemUtils.java | 3 +-
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 37 +-
.../db/utils/datastructure/AlignedTVList.java | 207 ++++++++---
.../iotdb/db/utils/datastructure/TVList.java | 27 +-
.../plan/parser/StatementGeneratorTest.java | 46 ++-
.../planner/node/write/WritePlanNodeSplitTest.java | 8 +-
.../plan/relational/analyzer/AnalyzerTest.java | 5 +-
37 files changed, 927 insertions(+), 386 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index d06c8a49ad5..0ca51cb0830 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1144,9 +1144,7 @@ public class IoTDBConfig {
private CompressionType WALCompressionAlgorithm = CompressionType.LZ4;
- /**
- * Use PrimitiveArrayManager to create arrays for InsertTablet requests.
- */
+ /** Use PrimitiveArrayManager to create arrays for InsertTablet requests. */
private boolean usePamForInsertTablet = true;
IoTDBConfig() {}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 1c7432db378..fe4429c1da5 100755
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1092,10 +1092,10 @@ public class IoTDBDescriptor {
"detail_container_min_degrade_memory_in_bytes",
String.valueOf(conf.getDetailContainerMinDegradeMemoryInBytes()))));
- conf.setUsePamForInsertTablet(Boolean.parseBoolean(
- properties.getProperty("use_pam_for_insert_tablet",
- String.valueOf(conf.isUsePamForInsertTablet()))
- ));
+ conf.setUsePamForInsertTablet(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "use_pam_for_insert_tablet",
String.valueOf(conf.isUsePamForInsertTablet()))));
loadIoTConsensusProps(properties);
loadIoTConsensusV2Props(properties);
@@ -2065,10 +2065,10 @@ public class IoTDBDescriptor {
"tvlist_sort_threshold",
ConfigurationFileUtils.getConfigurationDefaultValue("tvlist_sort_threshold"))));
- conf.setUsePamForInsertTablet(Boolean.parseBoolean(
- properties.getProperty("use_pam_for_insert_tablet",
- String.valueOf(conf.isUsePamForInsertTablet()))
- ));
+ conf.setUsePamForInsertTablet(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "use_pam_for_insert_tablet",
String.valueOf(conf.isUsePamForInsertTablet()))));
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 43d58119b00..d129dfe9ee4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -111,7 +111,7 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
LOGGER.warn(
"Batch failure in executing a InsertTabletNode. device: {},
startTime: {}, measurements: {}, failing status: {}",
node.getTargetPath(),
- node.getTimes()[0],
+ node.getTimes().get(0),
node.getMeasurements(),
e.getFailingStatus());
// For each error
@@ -184,7 +184,7 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
LOGGER.warn(
"Insert tablet failed. device: {}, startTime: {}, measurements:
{}, failing status: {}",
insertTabletNode.getTargetPath(),
- insertTabletNode.getTimes()[0],
+ insertTabletNode.getTimes().get(0),
insertTabletNode.getMeasurements(),
failedEntry.getValue());
// Return WRITE_PROCESS_REJECT directly for the consensus retry logic
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
index 713e5da872c..c75ce9910c3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
@@ -24,6 +24,9 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement.SingleArrayTimeView;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement.TimeView;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement.ValueView;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -119,9 +122,10 @@ public abstract class TabletInsertionEventParser {
this.deviceId = insertRowNode.getDeviceID();
this.isAligned = insertRowNode.isAligned();
- final long[] originTimestampColumn = new long[] {insertRowNode.getTime()};
+ final TimeView originTimestampColumn =
+ new SingleArrayTimeView(new long[] {insertRowNode.getTime()});
final List<Integer> rowIndexList =
generateRowIndexList(originTimestampColumn);
- this.timestampColumn = rowIndexList.stream().mapToLong(i ->
originTimestampColumn[i]).toArray();
+ this.timestampColumn =
rowIndexList.stream().mapToLong(originTimestampColumn::get).toArray();
generateColumnIndexMapper(
insertRowNode.getMeasurements(),
originColumnIndex2FilteredColumnIndexMapperList);
@@ -198,10 +202,10 @@ public abstract class TabletInsertionEventParser {
this.deviceId = insertTabletNode.getDeviceID();
this.isAligned = insertTabletNode.isAligned();
- final long[] originTimestampColumn = insertTabletNode.getTimes();
- final int originRowSize = originTimestampColumn.length;
+ final TimeView originTimestampColumn = insertTabletNode.getTimes();
+ final int originRowSize = originTimestampColumn.length();
final List<Integer> rowIndexList =
generateRowIndexList(originTimestampColumn);
- this.timestampColumn = rowIndexList.stream().mapToLong(i ->
originTimestampColumn[i]).toArray();
+ this.timestampColumn =
rowIndexList.stream().mapToLong(originTimestampColumn::get).toArray();
generateColumnIndexMapper(
insertTabletNode.getMeasurements(),
originColumnIndex2FilteredColumnIndexMapperList);
@@ -224,7 +228,7 @@ public abstract class TabletInsertionEventParser {
final String[] originColumnNameStringList =
insertTabletNode.getMeasurements();
final TsTableColumnCategory[] originColumnCategories =
insertTabletNode.getColumnCategories();
final TSDataType[] originValueColumnDataTypes =
insertTabletNode.getDataTypes();
- final Object[] originValueColumns = insertTabletNode.getColumns();
+ final ValueView originValueColumns = insertTabletNode.getColumns();
final BitMap[] originBitMapList =
(insertTabletNode.getBitMaps() == null
? IntStream.range(0, originColumnSize)
@@ -249,8 +253,7 @@ public abstract class TabletInsertionEventParser {
: Tablet.ColumnCategory.FIELD;
this.valueColumnDataTypes[filteredColumnIndex] =
originValueColumnDataTypes[i];
final BitMap bitMap = new BitMap(this.timestampColumn.length);
- if (Objects.isNull(originValueColumns[i])
- || Objects.isNull(originValueColumnDataTypes[i])) {
+ if (Objects.isNull(originValueColumnDataTypes[i])) {
fillNullValue(
originValueColumnDataTypes[i],
this.valueColumns,
@@ -261,9 +264,9 @@ public abstract class TabletInsertionEventParser {
this.valueColumns[filteredColumnIndex] =
filterValueColumnsByRowIndexList(
originValueColumnDataTypes[i],
- originValueColumns[i],
+ originValueColumns,
+ i,
rowIndexList,
- false,
originBitMapList[i],
bitMap);
}
@@ -296,7 +299,8 @@ public abstract class TabletInsertionEventParser {
Arrays.copyOf(
tablet.getTimestamps(),
tablet.getRowSize()); // tablet.timestamps.length ==
tablet.maxRowNumber
- final List<Integer> rowIndexList =
generateRowIndexList(originTimestampColumn);
+ final List<Integer> rowIndexList =
+ generateRowIndexList(new SingleArrayTimeView(originTimestampColumn));
this.timestampColumn = rowIndexList.stream().mapToLong(i ->
originTimestampColumn[i]).toArray();
final List<IMeasurementSchema> originMeasurementSchemaList =
tablet.getSchemas();
@@ -394,22 +398,22 @@ public abstract class TabletInsertionEventParser {
final String[] originMeasurementList,
final Integer[] originColumnIndex2FilteredColumnIndexMapperList);
- private List<Integer> generateRowIndexList(final long[]
originTimestampColumn) {
- final int rowCount = originTimestampColumn.length;
+ private List<Integer> generateRowIndexList(final TimeView
originTimestampColumn) {
+ final int rowCount = originTimestampColumn.length();
if (Objects.isNull(sourceEvent) || !sourceEvent.shouldParseTime()) {
return generateFullRowIndexList(rowCount);
}
final List<Integer> rowIndexList = new ArrayList<>();
// We assume that `originTimestampColumn` is ordered.
- if (originTimestampColumn[originTimestampColumn.length - 1] <
sourceEvent.getStartTime()
- || originTimestampColumn[0] > sourceEvent.getEndTime()) {
+ if (originTimestampColumn.get(originTimestampColumn.length() - 1) <
sourceEvent.getStartTime()
+ || originTimestampColumn.get(0) > sourceEvent.getEndTime()) {
return rowIndexList;
}
for (int rowIndex = 0; rowIndex < rowCount; ++rowIndex) {
- if (sourceEvent.getStartTime() <= originTimestampColumn[rowIndex]
- && originTimestampColumn[rowIndex] <= sourceEvent.getEndTime()) {
+ if (sourceEvent.getStartTime() <= originTimestampColumn.get(rowIndex)
+ && originTimestampColumn.get(rowIndex) <= sourceEvent.getEndTime()) {
rowIndexList.add(rowIndex);
}
}
@@ -583,6 +587,122 @@ public abstract class TabletInsertionEventParser {
}
}
+ private static Object filterValueColumnsByRowIndexList(
+ @NonNull final TSDataType type,
+ @NonNull final ValueView originValueColumns,
+ final int columnIndex,
+ @NonNull final List<Integer> rowIndexList,
+ @NonNull final BitMap originNullValueColumnBitmap,
+ @NonNull final BitMap nullValueColumnBitmap /* output parameters */) {
+ switch (type) {
+ case INT32:
+ {
+ final int[] valueColumns = new int[rowIndexList.size()];
+ for (int i = 0; i < rowIndexList.size(); ++i) {
+ if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ valueColumns[i] = 0;
+ nullValueColumnBitmap.mark(i);
+ } else {
+ valueColumns[i] = ((int)
originValueColumns.get(rowIndexList.get(i), columnIndex));
+ }
+ }
+ return valueColumns;
+ }
+ case DATE:
+ {
+ // Always store 'LocalDate[]' to help convert to tablet
+ final LocalDate[] valueColumns = new LocalDate[rowIndexList.size()];
+ // Only insertTabletNode uses this method
+ for (int i = 0; i < rowIndexList.size(); ++i) {
+ if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ valueColumns[i] = EMPTY_LOCALDATE;
+ nullValueColumnBitmap.mark(i);
+ } else {
+ valueColumns[i] =
+ DateUtils.parseIntToLocalDate(
+ ((int) originValueColumns.get(rowIndexList.get(i),
columnIndex)));
+ }
+ }
+ return valueColumns;
+ }
+ case INT64:
+ case TIMESTAMP:
+ {
+ final long[] valueColumns = new long[rowIndexList.size()];
+ for (int i = 0; i < rowIndexList.size(); ++i) {
+ if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ valueColumns[i] = 0L;
+ nullValueColumnBitmap.mark(i);
+ } else {
+ valueColumns[i] = ((long)
originValueColumns.get(rowIndexList.get(i), columnIndex));
+ }
+ }
+ return valueColumns;
+ }
+ case FLOAT:
+ {
+ final float[] valueColumns = new float[rowIndexList.size()];
+ for (int i = 0; i < rowIndexList.size(); ++i) {
+ if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ valueColumns[i] = 0F;
+ nullValueColumnBitmap.mark(i);
+ } else {
+ valueColumns[i] = ((float)
originValueColumns.get(rowIndexList.get(i), columnIndex));
+ }
+ }
+ return valueColumns;
+ }
+ case DOUBLE:
+ {
+ final double[] valueColumns = new double[rowIndexList.size()];
+ for (int i = 0; i < rowIndexList.size(); ++i) {
+ if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ valueColumns[i] = 0D;
+ nullValueColumnBitmap.mark(i);
+ } else {
+ valueColumns[i] = ((double)
originValueColumns.get(rowIndexList.get(i), columnIndex));
+ }
+ }
+ return valueColumns;
+ }
+ case BOOLEAN:
+ {
+ final boolean[] valueColumns = new boolean[rowIndexList.size()];
+ for (int i = 0; i < rowIndexList.size(); ++i) {
+ if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ valueColumns[i] = false;
+ nullValueColumnBitmap.mark(i);
+ } else {
+ valueColumns[i] =
+ ((boolean) originValueColumns.get(rowIndexList.get(i),
columnIndex));
+ }
+ }
+ return valueColumns;
+ }
+ case TEXT:
+ case BLOB:
+ case STRING:
+ {
+ final Binary[] valueColumns = new Binary[rowIndexList.size()];
+ for (int i = 0; i < rowIndexList.size(); ++i) {
+ Binary binary = (Binary)
originValueColumns.get(rowIndexList.get(i), columnIndex);
+ if (Objects.isNull(binary)
+ || Objects.isNull(binary.getValues())
+ || originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ valueColumns[i] = Binary.EMPTY_VALUE;
+ nullValueColumnBitmap.mark(i);
+ } else {
+ valueColumns[i] = new Binary(binary.getValues());
+ }
+ }
+ return valueColumns;
+ }
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", type));
+ }
+ }
+
private void fillNullValue(
final TSDataType type,
final Object[] valueColumns,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 5805f6b2ba6..d589d9c8bf5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -136,7 +136,8 @@ public class PipeDataRegionAssigner implements Closeable {
}
if (innerEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- InsertNode insertNode = ((PipeInsertNodeTabletInsertionEvent)
innerEvent).getInsertNodeViaCacheIfPossible();
+ InsertNode insertNode =
+ ((PipeInsertNodeTabletInsertionEvent)
innerEvent).getInsertNodeViaCacheIfPossible();
if (insertNode instanceof InsertTabletNode) {
InsertTabletNode insertTabletNode = (InsertTabletNode) insertNode;
synchronized (insertTabletNode) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
index eee47a5b0b1..2ee370b4fc0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.pipe.receiver.transform.statement;
-import org.apache.iotdb.db.pipe.receiver.transform.converter.ArrayConverter;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.tsfile.annotations.TableModel;
@@ -103,8 +102,7 @@ public class PipeConvertedInsertTabletStatement extends
InsertTabletStatement {
measurements[columnIndex],
dataTypes[columnIndex],
dataType);
- columns[columnIndex] =
- ArrayConverter.convert(dataTypes[columnIndex], dataType,
columns[columnIndex]);
+ columns.castTo(columnIndex, dataType);
dataTypes[columnIndex] = dataType;
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
index 1071d1a40f5..9a207e42618 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
@@ -19,9 +19,6 @@
package org.apache.iotdb.db.pipe.resource.memory;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.commons.lang3.stream.IntStreams;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
@@ -56,6 +53,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
public class InsertNodeMemoryEstimator {
@@ -588,7 +587,9 @@ public class InsertNodeMemoryEstimator {
// if columnsToCalculate is null, all columns are calculated
public static long sizeOfColumns(
- final Object[] columns, final MeasurementSchema[] measurementSchemas,
List<Integer> columnsToCalculate) {
+ final Object[] columns,
+ final MeasurementSchema[] measurementSchemas,
+ List<Integer> columnsToCalculate) {
// Directly calculate if measurementSchemas are absent
if (Objects.isNull(measurementSchemas)) {
return RamUsageEstimator.shallowSizeOf(columns)
@@ -609,33 +610,39 @@ public class InsertNodeMemoryEstimator {
}
switch (measurementSchemas[columnIndex].getType()) {
case INT64:
- case TIMESTAMP: {
- size += RamUsageEstimator.sizeOf((long[]) columns[columnIndex]);
- break;
- }
+ case TIMESTAMP:
+ {
+ size += RamUsageEstimator.sizeOf((long[]) columns[columnIndex]);
+ break;
+ }
case DATE:
- case INT32: {
- size += RamUsageEstimator.sizeOf((int[]) columns[columnIndex]);
- break;
- }
- case DOUBLE: {
- size += RamUsageEstimator.sizeOf((double[]) columns[columnIndex]);
- break;
- }
- case FLOAT: {
- size += RamUsageEstimator.sizeOf((float[]) columns[columnIndex]);
- break;
- }
- case BOOLEAN: {
- size += RamUsageEstimator.sizeOf((boolean[]) columns[columnIndex]);
- break;
- }
+ case INT32:
+ {
+ size += RamUsageEstimator.sizeOf((int[]) columns[columnIndex]);
+ break;
+ }
+ case DOUBLE:
+ {
+ size += RamUsageEstimator.sizeOf((double[]) columns[columnIndex]);
+ break;
+ }
+ case FLOAT:
+ {
+ size += RamUsageEstimator.sizeOf((float[]) columns[columnIndex]);
+ break;
+ }
+ case BOOLEAN:
+ {
+ size += RamUsageEstimator.sizeOf((boolean[]) columns[columnIndex]);
+ break;
+ }
case STRING:
case TEXT:
- case BLOB: {
- size += getBinarySize((Binary[]) columns[columnIndex]);
- break;
- }
+ case BLOB:
+ {
+ size += getBinarySize((Binary[]) columns[columnIndex]);
+ break;
+ }
}
}
return size;
@@ -643,13 +650,15 @@ public class InsertNodeMemoryEstimator {
// if columnsToCalculate is null, all columns are calculated
public static long sizeOfColumns(
- final Object[][] columns, final MeasurementSchema[] measurementSchemas,
List<Integer> columnsToCalculate) {
+ final Object[][] columns,
+ final MeasurementSchema[] measurementSchemas,
+ List<Integer> columnsToCalculate) {
// Directly calculate if measurementSchemas are absent
if (Objects.isNull(measurementSchemas)) {
return RamUsageEstimator.shallowSizeOf(columns)
+ Arrays.stream(columns)
- .mapToLong(InsertNodeMemoryEstimator::getNumBytesUnknownObject)
- .reduce(0L, Long::sum);
+ .mapToLong(InsertNodeMemoryEstimator::getNumBytesUnknownObject)
+ .reduce(0L, Long::sum);
}
if (columnsToCalculate == null) {
columnsToCalculate = IntStream.range(0,
columns.length).boxed().collect(Collectors.toList());
@@ -658,56 +667,57 @@ public class InsertNodeMemoryEstimator {
RamUsageEstimator.alignObjectSize(
NUM_BYTES_ARRAY_HEADER * (columns.length + 1) +
NUM_BYTES_OBJECT_REF * columns.length);
for (int columnIndex : columnsToCalculate) {
- if (measurementSchemas[columnIndex] == null ||
measurementSchemas[columnIndex].getType() == null) {
+ if (measurementSchemas[columnIndex] == null
+ || measurementSchemas[columnIndex].getType() == null) {
continue;
}
switch (measurementSchemas[columnIndex].getType()) {
case INT64:
case TIMESTAMP:
- {
- for (Object o : columns[columnIndex]) {
- size += RamUsageEstimator.sizeOf((long[]) o);
+ {
+ for (Object o : columns[columnIndex]) {
+ size += RamUsageEstimator.sizeOf((long[]) o);
+ }
+ break;
}
- break;
- }
case DATE:
case INT32:
- {
- for (Object o : columns[columnIndex]) {
- size += RamUsageEstimator.sizeOf((int[]) o);
+ {
+ for (Object o : columns[columnIndex]) {
+ size += RamUsageEstimator.sizeOf((int[]) o);
+ }
+ break;
}
- break;
- }
case DOUBLE:
- {
- for (Object o : columns[columnIndex]) {
- size += RamUsageEstimator.sizeOf((double[]) o);
+ {
+ for (Object o : columns[columnIndex]) {
+ size += RamUsageEstimator.sizeOf((double[]) o);
+ }
+ break;
}
- break;
- }
case FLOAT:
- {
- for (Object o : columns[columnIndex]) {
- size += RamUsageEstimator.sizeOf((float[]) o);
+ {
+ for (Object o : columns[columnIndex]) {
+ size += RamUsageEstimator.sizeOf((float[]) o);
+ }
+ break;
}
- break;
- }
case BOOLEAN:
- {
- for (Object o : columns[columnIndex]) {
- size += RamUsageEstimator.sizeOf((boolean[]) o);
+ {
+ for (Object o : columns[columnIndex]) {
+ size += RamUsageEstimator.sizeOf((boolean[]) o);
+ }
+ break;
}
- break;
- }
case STRING:
case TEXT:
case BLOB:
- {
- for (Object o : columns[columnIndex]) {
- size += RamUsageEstimator.sizeOf((Binary[]) o);
+ {
+ for (Object o : columns[columnIndex]) {
+ size += RamUsageEstimator.sizeOf((Binary[]) o);
+ }
+ break;
}
- break;
- }
}
}
return size;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index ac3a82d9730..a1e652cbd5a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -40,6 +40,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement.TwoDArrayValueView;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -232,7 +233,6 @@ public class MPPPublishHandler extends
AbstractInterceptHandler {
Stream.of(message.getValues(), message.getTagValues(),
message.getAttributeValues())
.flatMap(List::stream)
.toArray(Object[]::new);
- insertStatement.setColumns(columns);
insertStatement.setBitMaps(bitMaps);
insertStatement.setRowCount(rowSize);
insertStatement.setAligned(false);
@@ -259,6 +259,7 @@ public class MPPPublishHandler extends
AbstractInterceptHandler {
columnCategories[i] = TsTableColumnCategory.ATTRIBUTE;
}
insertStatement.setDataTypes(dataTypes);
+ insertStatement.setColumns(new TwoDArrayValueView(columns, dataTypes,
rowSize));
insertStatement.setColumnCategories(columnCategories);
return insertStatement;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/StatementConstructionHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/StatementConstructionHandler.java
index 10d32d03147..5d993e5428f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/StatementConstructionHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/StatementConstructionHandler.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.protocol.rest.table.v1.model.InsertTabletRequest;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement.TwoDArrayValueView;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
import org.apache.tsfile.enums.TSDataType;
@@ -168,7 +169,7 @@ public class StatementConstructionHandler {
throw new IllegalArgumentException("Invalid input: " +
rawDataType.get(columnIndex));
}
}
- insertStatement.setColumns(columns);
+ insertStatement.setColumns(new TwoDArrayValueView(columns, dataTypes,
rowSize));
insertStatement.setBitMaps(bitMaps);
insertStatement.setRowCount(rowSize);
insertStatement.setDataTypes(dataTypes);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/handler/StatementConstructionHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/handler/StatementConstructionHandler.java
index 523b8ccfb6a..61d4c7dc406 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/handler/StatementConstructionHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/handler/StatementConstructionHandler.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.protocol.rest.v1.model.InsertTabletRequest;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement.TwoDArrayValueView;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
import org.apache.tsfile.enums.TSDataType;
@@ -169,10 +170,10 @@ public class StatementConstructionHandler {
insertStatement.setTimes(
insertTabletRequest.getTimestamps().stream().mapToLong(Long::longValue).toArray());
- insertStatement.setColumns(columns);
insertStatement.setBitMaps(bitMaps);
insertStatement.setRowCount(insertTabletRequest.getTimestamps().size());
insertStatement.setDataTypes(dataTypes);
+ insertStatement.setColumns(new TwoDArrayValueView(columns, dataTypes,
rowSize));
insertStatement.setAligned(insertTabletRequest.getIsAligned());
return insertStatement;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/StatementConstructionHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/StatementConstructionHandler.java
index 4bf9a1a5510..f9e5f1bb604 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/StatementConstructionHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/StatementConstructionHandler.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDeviceP
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement.TwoDArrayValueView;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
import org.apache.iotdb.rpc.IoTDBConnectionException;
@@ -179,10 +180,10 @@ public class StatementConstructionHandler {
insertStatement.setTimes(
insertTabletRequest.getTimestamps().stream().mapToLong(Long::longValue).toArray());
- insertStatement.setColumns(columns);
insertStatement.setBitMaps(bitMaps);
insertStatement.setRowCount(insertTabletRequest.getTimestamps().size());
insertStatement.setDataTypes(dataTypes);
+ insertStatement.setColumns(new TwoDArrayValueView(columns, dataTypes,
rowSize));
insertStatement.setAligned(insertTabletRequest.getIsAligned());
return insertStatement;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
index 99317ba7b09..15afe24a90c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
@@ -30,6 +30,8 @@ import
org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement.SingleArrayTimeView;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement.TwoDArrayValueView;
import org.apache.iotdb.rpc.TSStatusCode;
import com.google.common.util.concurrent.Futures;
@@ -538,9 +540,9 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
}
}
- insertTabletStatement.setTimes(times);
+ insertTabletStatement.setTimes(new SingleArrayTimeView(times));
insertTabletStatement.setBitMaps(bitMaps);
- insertTabletStatement.setColumns(columns);
+ insertTabletStatement.setColumns(new TwoDArrayValueView(columns,
dataTypes, rowCount));
return insertTabletStatement;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
index a30c162f32a..50cddde149a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.queryengine.plan.parser;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
@@ -120,6 +119,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
/** Convert SQL and RPC requests to {@link Statement}. */
public class StatementGenerator {
@@ -331,30 +331,42 @@ public class StatementGenerator {
return insertStatement;
}
- private static void deserializeTimeValue(InsertTabletStatement
insertTabletStatement,
- ByteBuffer timeBuffer, ByteBuffer valueBuffer, int rowSize, TSDataType[]
dataTypes) {
+ private static void deserializeTimeValue(
+ InsertTabletStatement insertTabletStatement,
+ ByteBuffer timeBuffer,
+ ByteBuffer valueBuffer,
+ int rowSize,
+ TSDataType[] dataTypes) {
if (!IoTDBDescriptor.getInstance().getConfig().isUsePamForInsertTablet()) {
- long[] timestamps =
- QueryDataSetUtils.readTimesFromBuffer(timeBuffer, rowSize);
+ long[] timestamps = QueryDataSetUtils.readTimesFromBuffer(timeBuffer,
rowSize);
if (timestamps.length != 0) {
TimestampPrecisionUtils.checkTimestampPrecision(timestamps[0]);
TimestampPrecisionUtils.checkTimestampPrecision(timestamps[timestamps.length -
1]);
}
insertTabletStatement.setTimes(new SingleArrayTimeView(timestamps));
- insertTabletStatement.setColumns(new TwoDArrayValueView(
- QueryDataSetUtils.readTabletValuesFromBuffer(
- valueBuffer,
+ insertTabletStatement.setColumns(
+ new TwoDArrayValueView(
+ QueryDataSetUtils.readTabletValuesFromBuffer(
+ valueBuffer, dataTypes, dataTypes.length, rowSize),
dataTypes,
- dataTypes.length,
- rowSize), dataTypes, rowSize));
+ rowSize));
} else {
long[][] timestamps =
QueryDataSetUtils.readTimesFromBufferWithPam(timeBuffer, rowSize);
if (timestamps.length != 0) {
TimestampPrecisionUtils.checkTimestampPrecision(timestamps[0][0]);
- TimestampPrecisionUtils.checkTimestampPrecision(timestamps[rowSize /
PrimitiveArrayManager.ARRAY_SIZE][rowSize % PrimitiveArrayManager.ARRAY_SIZE]);
+ TimestampPrecisionUtils.checkTimestampPrecision(
+ timestamps[rowSize / PrimitiveArrayManager.ARRAY_SIZE][
+ rowSize % PrimitiveArrayManager.ARRAY_SIZE]);
}
- insertTabletStatement.setTimes(new
MultiArrayTimeView(PrimitiveArrayManager.ARRAY_SIZE, timestamps, rowSize));
- insertTabletStatement.setColumns(new
ThreeDArrayValueView(QueryDataSetUtils.readTabletValuesFromBufferWithPam(valueBuffer,
dataTypes, dataTypes.length, rowSize), dataTypes, rowSize,
PrimitiveArrayManager.ARRAY_SIZE));
+ insertTabletStatement.setTimes(
+ new MultiArrayTimeView(PrimitiveArrayManager.ARRAY_SIZE, timestamps,
rowSize));
+ insertTabletStatement.setColumns(
+ new ThreeDArrayValueView(
+ QueryDataSetUtils.readTabletValuesFromBufferWithPam(
+ valueBuffer, dataTypes, dataTypes.length, rowSize),
+ dataTypes,
+ rowSize,
+ PrimitiveArrayManager.ARRAY_SIZE));
insertTabletStatement.setRefCount(new AtomicInteger(1));
}
}
@@ -373,8 +385,12 @@ public class StatementGenerator {
dataTypes[i] = TSDataType.deserialize((byte)
insertTabletReq.types.get(i).intValue());
}
- deserializeTimeValue(insertStatement, insertTabletReq.timestamps,
insertTabletReq.values,
- insertTabletReq.size, dataTypes);
+ deserializeTimeValue(
+ insertStatement,
+ insertTabletReq.timestamps,
+ insertTabletReq.values,
+ insertTabletReq.size,
+ dataTypes);
insertStatement.setBitMaps(
QueryDataSetUtils.readBitMapsFromBuffer(
@@ -419,8 +435,12 @@ public class StatementGenerator {
dataTypes[j] = TSDataType.deserialize((byte)
req.typesList.get(i).get(j).intValue());
}
- deserializeTimeValue(insertTabletStatement, req.timestampsList.get(i),
req.getValuesList().get(i),
- req.sizeList.get(i), dataTypes);
+ deserializeTimeValue(
+ insertTabletStatement,
+ req.timestampsList.get(i),
+ req.getValuesList().get(i),
+ req.sizeList.get(i),
+ dataTypes);
insertTabletStatement.setBitMaps(
QueryDataSetUtils.readBitMapsFromBuffer(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index 7e78e12c7cf..4b082b68746 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
@@ -37,7 +36,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TreeDeviceSchemaCacheManager;
-import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement.MultiArrayTimeView;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement.SingleArrayTimeView;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement.ThreeDArrayValueView;
@@ -80,6 +78,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.iotdb.db.utils.CommonUtils.isAlive;
@@ -189,6 +188,10 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
this.times = times;
}
+ public void setTimes(long[] times) {
+ this.times = new SingleArrayTimeView(times);
+ }
+
public BitMap[] getBitMaps() {
return bitMaps;
}
@@ -205,6 +208,13 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
this.columns = columns;
}
+ public void setColumns(Object[] columns) {
+ if (dataTypes == null || rowCount == 0) {
+ throw new IllegalArgumentException("dataTypes and rowCount must be set
first");
+ }
+ this.columns = new TwoDArrayValueView(columns, dataTypes, rowCount);
+ }
+
public int getRowCount() {
return rowCount;
}
@@ -382,8 +392,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
bitMaps,
new ThreeDArrayValueView(values, dataTypes, count,
PrimitiveArrayManager.ARRAY_SIZE),
count,
- new AtomicInteger(1)
- );
+ new AtomicInteger(1));
}
}
@@ -473,7 +482,8 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
return values;
}
- protected Object[][] initTabletValuesWithPam(int columnSize, int rowSize,
TSDataType[] dataTypes) {
+ protected Object[][] initTabletValuesWithPam(
+ int columnSize, int rowSize, TSDataType[] dataTypes) {
Object[][] values = new Object[columnSize][];
int numOfArrays = PrimitiveArrayManager.numOfArrays(rowSize);
for (int i = 0; i < values.length; i++) {
@@ -674,7 +684,6 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
}
}
-
public static InsertTabletNode deserialize(ByteBuffer byteBuffer) {
InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
insertNode.subDeserialize(byteBuffer);
@@ -719,7 +728,11 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
QueryDataSetUtils.readBitMapsFromBuffer(buffer, measurementSize,
rowCount).orElse(null);
}
columns =
- new
TwoDArrayValueView(QueryDataSetUtils.readTabletValuesFromBuffer(buffer,
dataTypes, measurementSize, rowCount), dataTypes, rowCount);
+ new TwoDArrayValueView(
+ QueryDataSetUtils.readTabletValuesFromBuffer(
+ buffer, dataTypes, measurementSize, rowCount),
+ dataTypes,
+ rowCount);
isAligned = buffer.get() == 1;
}
@@ -890,7 +903,11 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
QueryDataSetUtils.readBitMapsFromStream(stream, measurementSize,
rowCount).orElse(null);
}
columns =
- new
TwoDArrayValueView(QueryDataSetUtils.readTabletValuesFromStream(stream,
dataTypes, measurementSize, rowCount), dataTypes, rowCount);
+ new TwoDArrayValueView(
+ QueryDataSetUtils.readTabletValuesFromStream(
+ stream, dataTypes, measurementSize, rowCount),
+ dataTypes,
+ rowCount);
isAligned = stream.readByte() == 1;
}
@@ -929,7 +946,11 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
QueryDataSetUtils.readBitMapsFromBuffer(buffer, measurementSize,
rowCount).orElse(null);
}
columns =
- new
TwoDArrayValueView(QueryDataSetUtils.readTabletValuesFromBuffer(buffer,
dataTypes, measurementSize, rowCount), dataTypes, rowCount);
+ new TwoDArrayValueView(
+ QueryDataSetUtils.readTabletValuesFromBuffer(
+ buffer, dataTypes, measurementSize, rowCount),
+ dataTypes,
+ rowCount);
isAligned = buffer.get() == 1;
}
@@ -963,7 +984,6 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
&& Objects.equals(range, that.range);
}
-
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitInsertTablet(this, context);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index 3239e225f0a..be25c07e70e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -53,6 +52,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
public class RelationalInsertTabletNode extends InsertTabletNode {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java
index 9894adb4d01..0a78ada2d71 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java
@@ -87,8 +87,7 @@ public class InsertTablet extends WrappedInsertStatement {
for (int attrColNum = 0; attrColNum < attrColumnIndices.size();
attrColNum++) {
final int columnIndex = attrColumnIndices.get(attrColNum);
if (!insertTabletStatement.isNull(rowIndex, columnIndex)) {
- attrValues[attrColNum] =
- ((Object[])
insertTabletStatement.getColumns()[columnIndex])[rowIndex];
+ attrValues[attrColNum] =
insertTabletStatement.getColumns().get(rowIndex, columnIndex);
}
}
result.add(attrValues);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index ce266dc3fc8..99eb07a780d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -430,8 +430,7 @@ public abstract class InsertBaseStatement extends Statement
implements Accountab
protected TSDataType dataType;
protected Exception cause;
- public FailedMeasurementInfo(
- String measurement, TSDataType dataType, Exception cause) {
+ public FailedMeasurementInfo(String measurement, TSDataType dataType,
Exception cause) {
this.measurement = measurement;
this.dataType = dataType;
this.cause = cause;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
index d851c1642d5..ce7956c32ea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
@@ -266,8 +266,7 @@ public class InsertRowStatement extends InsertBaseStatement
implements ISchemaVa
}
InsertBaseStatement.FailedMeasurementInfo failedMeasurementInfo =
- new InsertBaseStatement.FailedMeasurementInfo(
- measurements[index], dataTypes[index], values[index], cause);
+ new InsertBaseStatement.FailedMeasurementInfo(measurements[index],
dataTypes[index], cause);
failedMeasurementIndex2Info.putIfAbsent(index, failedMeasurementInfo);
measurements[index] = null;
@@ -289,10 +288,6 @@ public class InsertRowStatement extends
InsertBaseStatement implements ISchemaVa
if (dataTypes != null) {
dataTypes[index] = info.getDataType();
}
-
- if (values != null) {
- values[index] = info.getValue();
- }
});
failedMeasurementIndex2Info.clear();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index ec9f2fc6213..a91b0f8dd02 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -19,12 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.statement.crud;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
@@ -47,9 +42,9 @@ import
org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
import org.apache.iotdb.db.utils.CommonUtils;
-
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.TVList;
+
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.IDeviceID.Factory;
@@ -64,6 +59,9 @@ import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.apache.tsfile.write.schema.MeasurementSchema;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -71,6 +69,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
public class InsertTabletStatement extends InsertBaseStatement implements
ISchemaValidation {
@@ -139,6 +139,13 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
this.columns = columns;
}
+ public void setColumns(Object[] columns) {
+ if (dataTypes == null || rowCount == 0) {
+ throw new IllegalArgumentException("dataTypes and rowCount must be set
first");
+ }
+ this.columns = new TwoDArrayValueView(columns, dataTypes, rowCount);
+ }
+
public BitMap[] getBitMaps() {
return nullBitMaps;
}
@@ -155,6 +162,10 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
this.times = times;
}
+ public void setTimes(long[] times) {
+ this.times = new SingleArrayTimeView(times);
+ }
+
@Override
public boolean isEmpty() {
return rowCount == 0
@@ -231,8 +242,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
}
InsertBaseStatement.FailedMeasurementInfo failedMeasurementInfo =
- new InsertBaseStatement.FailedMeasurementInfo(
- measurements[index], dataTypes[index], cause);
+ new InsertBaseStatement.FailedMeasurementInfo(measurements[index],
dataTypes[index], cause);
failedMeasurementIndex2Info.putIfAbsent(index, failedMeasurementInfo);
measurements[index] = null;
@@ -300,8 +310,9 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
statement.setAligned(this.measurementIsAligned[realIndex]);
}
}
- statement.setColumns(new ColumnMappedValueView(columns,
pairList.stream().map(Pair::getRight).collect(
- Collectors.toList())));
+ statement.setColumns(
+ new ColumnMappedValueView(
+ columns,
pairList.stream().map(Pair::getRight).collect(Collectors.toList())));
statement.setMeasurements(measurements);
statement.setMeasurementSchemas(measurementSchemas);
statement.setDataTypes(dataTypes);
@@ -444,8 +455,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
for (int i = 0; i < getIdColumnIndices().size(); i++) {
final Integer columnIndex = getIdColumnIndices().get(i);
boolean isNull = isNull(rowIdx, i);
- deviceIdSegments[i + 1] =
- isNull ? null : columns.get(rowIdx, columnIndex).toString();
+ deviceIdSegments[i + 1] = isNull ? null : columns.get(rowIdx,
columnIndex).toString();
}
deviceIDs[rowIdx] = Factory.DEFAULT_FACTORY.create(deviceIdSegments);
}
@@ -508,10 +518,10 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
+ InsertNodeMemoryEstimator.sizeOfBitMapArray(nullBitMaps)
+ columns.ramSize(measurementSchemas)
+ (Objects.nonNull(deviceIDs)
- ? Arrays.stream(deviceIDs)
- .mapToLong(InsertNodeMemoryEstimator::sizeOfIDeviceID)
- .reduce(0L, Long::sum)
- : 0L);
+ ? Arrays.stream(deviceIDs)
+ .mapToLong(InsertNodeMemoryEstimator::sizeOfIDeviceID)
+ .reduce(0L, Long::sum)
+ : 0L);
}
public boolean isNull(int row, int col) {
@@ -575,6 +585,10 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
void release();
void putTo(TVList tvList, BitMap bitMap, int start, int end);
+
+ // for compatibility only, avoid using it
+ @Deprecated
+ long[] toSingleArray();
}
public static class SingleArrayTimeView implements TimeView {
@@ -609,8 +623,12 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
public void copyTo(TimeView timeView, int thisFrom, int targetFrom, int
copyLength) {
if (timeView instanceof SingleArrayTimeView) {
copyLength = Math.min(copyLength, this.length() - thisFrom);
- System.arraycopy(this.timestamps, thisFrom, ((SingleArrayTimeView)
timeView).timestamps,
- targetFrom, copyLength);
+ System.arraycopy(
+ this.timestamps,
+ thisFrom,
+ ((SingleArrayTimeView) timeView).timestamps,
+ targetFrom,
+ copyLength);
} else {
TimeView.super.copyTo(timeView, thisFrom, targetFrom, copyLength);
}
@@ -625,6 +643,11 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
public void putTo(TVList tvList, BitMap bitMap, int start, int end) {
tvList.putTimes(timestamps, bitMap, start, end);
}
+
+ @Override
+ public long[] toSingleArray() {
+ return timestamps;
+ }
}
public static class MultiArrayTimeView implements TimeView {
@@ -665,8 +688,8 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
&& this.singleArraySize == ((MultiArrayTimeView)
timeView).singleArraySize) {
copyLength = Math.min(copyLength, this.length() - thisFrom);
while (copyLength > 0) {
- int singleCopyLength = copyOneArrayTo(((MultiArrayTimeView)
timeView), thisFrom,
- targetFrom, copyLength);
+ int singleCopyLength =
+ copyOneArrayTo(((MultiArrayTimeView) timeView), thisFrom,
targetFrom, copyLength);
thisFrom += singleCopyLength;
targetFrom += singleCopyLength;
copyLength -= singleCopyLength;
@@ -676,8 +699,8 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
}
}
- private int copyOneArrayTo(MultiArrayTimeView target, int thisFrom, int
targetFrom,
- int copyLength) {
+ private int copyOneArrayTo(
+ MultiArrayTimeView target, int thisFrom, int targetFrom, int
copyLength) {
int srcArrayPos = thisFrom % singleArraySize;
int srcArrayRemaining = singleArraySize - srcArrayPos;
copyLength = Math.min(copyLength, srcArrayRemaining);
@@ -689,7 +712,11 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
if (copyLength > targetArrayRemaining) {
System.arraycopy(srcArray, srcArrayPos, targetArray, targetArrayPos,
targetArrayRemaining);
long[] nextTargetArray = target.timestamps[targetFrom /
singleArraySize + 1];
- System.arraycopy(srcArray, srcArrayPos + targetArrayRemaining,
nextTargetArray, 0,
+ System.arraycopy(
+ srcArray,
+ srcArrayPos + targetArrayRemaining,
+ nextTargetArray,
+ 0,
copyLength - targetArrayRemaining);
} else {
System.arraycopy(srcArray, srcArrayPos, targetArray, targetArrayPos,
copyLength);
@@ -719,11 +746,32 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
int arrayStart = current % singleArraySize;
int arrayEnd = arrayStart + copyLength;
- tvList.putTimes(timestamps[arrayIndex], bitMap.getRegion(current,
current + copyLength), arrayStart, arrayEnd);
+ tvList.putTimes(
+ timestamps[arrayIndex],
+ bitMap.getRegion(current, current + copyLength),
+ arrayStart,
+ arrayEnd);
current += copyLength;
}
}
+
+ @Override
+ public long[] toSingleArray() {
+ long[] singleArray = new long[length];
+ int arrayIndex = 0;
+ for (; arrayIndex < timestamps.length - 1; arrayIndex++) {
+ System.arraycopy(
+ timestamps[arrayIndex], 0, singleArray, arrayIndex *
singleArraySize, singleArraySize);
+ }
+ System.arraycopy(
+ timestamps[arrayIndex],
+ 0,
+ singleArray,
+ arrayIndex * singleArraySize,
+ length % singleArraySize);
+ return singleArray;
+ }
}
public interface ValueView {
@@ -738,8 +786,8 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
TSDataType[] dataTypes();
- default void copyTo(ValueView valueView, int colIndex, int thisFrom, int
targetFrom,
- int copyLength) {
+ default void copyTo(
+ ValueView valueView, int colIndex, int thisFrom, int targetFrom, int
copyLength) {
copyLength = Math.min(copyLength, this.rowLength() - thisFrom);
for (int i = 0; i < copyLength; i++) {
valueView.set(targetFrom + i, colIndex, get(thisFrom + i, colIndex));
@@ -747,7 +795,9 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
}
void serializeColumn(int colIndex, ByteBuffer buffer);
+
void serializeColumn(int colIndex, DataOutputStream stream) throws
IOException;
+
void serializeColumn(int colIndex, IWALByteBufferView buffer, int start,
int end);
default long getColumnSize(int colIndex, int start, int end) {
@@ -787,15 +837,29 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
void castTo(int colIndex, TSDataType newType);
void insertColumn(int pos, ColumnSchema columnSchema);
+
void swapColumn(int src, int target);
long ramSize(MeasurementSchema[] measurementSchemas);
+
void reserveColumns(List<Integer> columnsToReserve);
void release();
void putTo(TVList tvList, int columnIndex, BitMap bitMap, int start, int
end, int pos);
- void putTo(AlignedTVList tvList, List<Integer> columnIndices, BitMap[]
bitMaps, int start, int end, int pos);
+
+ void putTo(
+ AlignedTVList tvList,
+ List<Integer> columnIndices,
+ BitMap[] bitMaps,
+ int start,
+ int end,
+ TSStatus[] results,
+ int pos);
+
+ // for compatibility only, do no use it in new code
+ @Deprecated
+ Object[] toTwoDArray();
}
public static class TwoDArrayValueView implements ValueView {
@@ -879,11 +943,16 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
}
@Override
- public void copyTo(ValueView valueView, int colIndex, int thisFrom, int
targetFrom,
- int copyLength) {
+ public void copyTo(
+ ValueView valueView, int colIndex, int thisFrom, int targetFrom, int
copyLength) {
if (valueView instanceof TwoDArrayValueView) {
copyLength = Math.min(copyLength, this.rowLength() - thisFrom);
- System.arraycopy(values[colIndex], thisFrom, ((TwoDArrayValueView)
valueView).values[colIndex], targetFrom, copyLength);
+ System.arraycopy(
+ values[colIndex],
+ thisFrom,
+ ((TwoDArrayValueView) valueView).values[colIndex],
+ targetFrom,
+ copyLength);
} else {
ValueView.super.copyTo(valueView, colIndex, thisFrom, targetFrom,
copyLength);
}
@@ -937,7 +1006,8 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
}
break;
default:
- throw new
UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED,
dataTypes[colIndex]));
+ throw new UnSupportedDataTypeException(
+ String.format(DATATYPE_UNSUPPORTED, dataTypes[colIndex]));
}
}
@@ -989,7 +1059,8 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
}
break;
default:
- throw new
UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED,
dataTypes[colIndex]));
+ throw new UnSupportedDataTypeException(
+ String.format(DATATYPE_UNSUPPORTED, dataTypes[colIndex]));
}
}
@@ -1042,7 +1113,8 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
}
break;
default:
- throw new
UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED,
dataTypes[colIndex]));
+ throw new UnSupportedDataTypeException(
+ String.format(DATATYPE_UNSUPPORTED, dataTypes[colIndex]));
}
}
@@ -1062,9 +1134,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
System.arraycopy(values, 0, tmpColumns, 0, pos);
tmpColumns[pos] =
CommonUtils.createValueColumnOfDataType(
- InternalTypeManager.getTSDataType(columnSchema.getType()),
- columnSchema.getColumnCategory(),
- rowLength);
+ InternalTypeManager.getTSDataType(columnSchema.getType()),
rowLength);
System.arraycopy(values, pos, tmpColumns, pos + 1, values.length - pos);
values = tmpColumns;
}
@@ -1093,8 +1163,26 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
public void putTo(TVList tvList, int columnIndex, BitMap bitMap, int
start, int end, int pos) {
tvList.putValues(values[columnIndex], bitMap, start, end, pos,
rowLength);
}
+
+ @Override
+ public void putTo(
+ AlignedTVList tvList,
+ List<Integer> columnIndices,
+ BitMap[] bitMaps,
+ int start,
+ int end,
+ TSStatus[] results,
+ int pos) {
+ tvList.putAlignedValues(values, columnIndices, bitMaps, start, end,
results, pos);
+ }
+
+ @Override
+ public Object[] toTwoDArray() {
+ return values;
+ }
}
+ @SuppressWarnings("SuspiciousSystemArraycopy")
public static class ThreeDArrayValueView implements ValueView {
private final TSDataType[] dataTypes;
@@ -1102,7 +1190,8 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
private final int rowLength;
private final int singleArraySize;
- public ThreeDArrayValueView(Object[][] values, TSDataType[] dataTypes, int
rowLength, int singleArraySize) {
+ public ThreeDArrayValueView(
+ Object[][] values, TSDataType[] dataTypes, int rowLength, int
singleArraySize) {
this.values = values;
this.dataTypes = dataTypes;
this.rowLength = rowLength;
@@ -1127,17 +1216,22 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
return ((int[]) values[colIndex][rowIndex /
singleArraySize])[rowIndex % singleArraySize];
case INT64:
case TIMESTAMP:
- return ((long[]) values[colIndex][rowIndex /
singleArraySize])[rowIndex % singleArraySize];
+ return ((long[]) values[colIndex][rowIndex / singleArraySize])
+ [rowIndex % singleArraySize];
case FLOAT:
- return ((float[]) values[colIndex][rowIndex /
singleArraySize])[rowIndex % singleArraySize];
+ return ((float[]) values[colIndex][rowIndex / singleArraySize])
+ [rowIndex % singleArraySize];
case DOUBLE:
- return ((double[]) values[colIndex][rowIndex /
singleArraySize])[rowIndex % singleArraySize];
+ return ((double[]) values[colIndex][rowIndex / singleArraySize])
+ [rowIndex % singleArraySize];
case TEXT:
case BLOB:
case STRING:
- return ((Binary[]) values[colIndex][rowIndex /
singleArraySize])[rowIndex % singleArraySize];
+ return ((Binary[]) values[colIndex][rowIndex / singleArraySize])
+ [rowIndex % singleArraySize];
case BOOLEAN:
- return ((boolean[]) values[colIndex][rowIndex /
singleArraySize])[rowIndex % singleArraySize];
+ return ((boolean[]) values[colIndex][rowIndex / singleArraySize])
+ [rowIndex % singleArraySize];
case VECTOR:
case UNKNOWN:
default:
@@ -1150,25 +1244,31 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
switch (dataTypes[colIndex]) {
case INT32:
case DATE:
- ((int[]) values[colIndex][rowIndex / singleArraySize])[rowIndex %
singleArraySize] = ((int) value);
+ ((int[]) values[colIndex][rowIndex / singleArraySize])[rowIndex %
singleArraySize] =
+ ((int) value);
return;
case INT64:
case TIMESTAMP:
- ((long[]) values[colIndex][rowIndex / singleArraySize])[rowIndex %
singleArraySize] = ((long) value);
+ ((long[]) values[colIndex][rowIndex / singleArraySize])[rowIndex %
singleArraySize] =
+ ((long) value);
return;
case FLOAT:
- ((float[]) values[colIndex][rowIndex / singleArraySize])[rowIndex %
singleArraySize] = ((float) value);
+ ((float[]) values[colIndex][rowIndex / singleArraySize])[rowIndex %
singleArraySize] =
+ ((float) value);
return;
case DOUBLE:
- ((double[]) values[colIndex][rowIndex / singleArraySize])[rowIndex %
singleArraySize] = ((double) value);
+ ((double[]) values[colIndex][rowIndex / singleArraySize])[rowIndex %
singleArraySize] =
+ ((double) value);
return;
case TEXT:
case BLOB:
case STRING:
- ((Binary[]) values[colIndex][rowIndex / singleArraySize])[rowIndex %
singleArraySize] = ((Binary) value);
+ ((Binary[]) values[colIndex][rowIndex / singleArraySize])[rowIndex %
singleArraySize] =
+ ((Binary) value);
return;
case BOOLEAN:
- ((boolean[]) values[colIndex][rowIndex / singleArraySize])[rowIndex
% singleArraySize] = ((boolean) value);
+ ((boolean[]) values[colIndex][rowIndex / singleArraySize])[rowIndex
% singleArraySize] =
+ ((boolean) value);
return;
case VECTOR:
case UNKNOWN:
@@ -1178,13 +1278,15 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
}
@Override
- public void copyTo(ValueView valueView, int colIndex, int thisFrom, int
targetFrom,
- int copyLength) {
- if (valueView instanceof ThreeDArrayValueView && this.singleArraySize ==
((ThreeDArrayValueView) valueView).singleArraySize) {
+ public void copyTo(
+ ValueView valueView, int colIndex, int thisFrom, int targetFrom, int
copyLength) {
+ if (valueView instanceof ThreeDArrayValueView
+ && this.singleArraySize == ((ThreeDArrayValueView)
valueView).singleArraySize) {
copyLength = Math.min(copyLength, this.rowLength() - thisFrom);
while (copyLength > 0) {
- int singleCopyLength = copyOneArrayTo(((ThreeDArrayValueView)
valueView), colIndex, thisFrom,
- targetFrom, copyLength);
+ int singleCopyLength =
+ copyOneArrayTo(
+ ((ThreeDArrayValueView) valueView), colIndex, thisFrom,
targetFrom, copyLength);
thisFrom += singleCopyLength;
targetFrom += singleCopyLength;
copyLength -= singleCopyLength;
@@ -1194,8 +1296,8 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
}
}
- private int copyOneArrayTo(ThreeDArrayValueView target, int colIndex, int
thisFrom, int targetFrom,
- int copyLength) {
+ private int copyOneArrayTo(
+ ThreeDArrayValueView target, int colIndex, int thisFrom, int
targetFrom, int copyLength) {
int srcArrayPos = thisFrom % singleArraySize;
int srcArrayRemaining = singleArraySize - srcArrayPos;
copyLength = Math.min(copyLength, srcArrayRemaining);
@@ -1207,7 +1309,11 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
if (copyLength > targetArrayRemaining) {
System.arraycopy(srcArray, srcArrayPos, targetArray, targetArrayPos,
targetArrayRemaining);
Object nextTargetArray = target.values[colIndex][targetFrom /
singleArraySize + 1];
- System.arraycopy(srcArray, srcArrayPos + targetArrayRemaining,
nextTargetArray, 0,
+ System.arraycopy(
+ srcArray,
+ srcArrayPos + targetArrayRemaining,
+ nextTargetArray,
+ 0,
copyLength - targetArrayRemaining);
} else {
System.arraycopy(srcArray, srcArrayPos, targetArray, targetArrayPos,
copyLength);
@@ -1300,7 +1406,8 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
}
break;
default:
- throw new
UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED,
dataTypes[colIndex]));
+ throw new UnSupportedDataTypeException(
+ String.format(DATATYPE_UNSUPPORTED, dataTypes[colIndex]));
}
}
@@ -1389,7 +1496,8 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
}
break;
default:
- throw new
UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED,
dataTypes[colIndex]));
+ throw new UnSupportedDataTypeException(
+ String.format(DATATYPE_UNSUPPORTED, dataTypes[colIndex]));
}
}
@@ -1504,7 +1612,8 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
}
break;
default:
- throw new
UnSupportedDataTypeException(String.format(DATATYPE_UNSUPPORTED,
dataTypes[colIndex]));
+ throw new UnSupportedDataTypeException(
+ String.format(DATATYPE_UNSUPPORTED, dataTypes[colIndex]));
}
}
@@ -1529,7 +1638,9 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
int arrayNum = rowLength / singleArraySize + rowLength % singleArraySize
== 0 ? 0 : 1;
tmpColumns[pos] = new Object[arrayNum];
for (int i = 0; i < arrayNum; i++) {
- tmpColumns[pos][i] =
PrimitiveArrayManager.allocate(InternalTypeManager.getTSDataType(columnSchema.getType()));
+ tmpColumns[pos][i] =
+ PrimitiveArrayManager.allocate(
+ InternalTypeManager.getTSDataType(columnSchema.getType()));
}
System.arraycopy(values, pos, tmpColumns, pos + 1, values.length - pos);
values = tmpColumns;
@@ -1554,7 +1665,8 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
columnsToReserve.sort(null);
int reserveColumnCursor = 0;
for (int i = 0; i < values.length; i++) {
- if (reserveColumnCursor < columnsToReserve.size() && i !=
columnsToReserve.get(reserveColumnCursor)) {
+ if (reserveColumnCursor < columnsToReserve.size()
+ && i != columnsToReserve.get(reserveColumnCursor)) {
// i is a column to remove
Object[] value = values[i];
for (int j = 0, valueLength = value.length; j < valueLength; j++) {
@@ -1569,7 +1681,8 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
}
Object[][] tmpValue = new Object[columnsToReserve.size()][];
- for (int j = 0, columnsToReserveSize = columnsToReserve.size(); j <
columnsToReserveSize;
+ for (int j = 0, columnsToReserveSize = columnsToReserve.size();
+ j < columnsToReserveSize;
j++) {
Integer realPos = columnsToReserve.get(j);
tmpValue[j] = values[realPos];
@@ -1603,11 +1716,110 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
int arrayStart = current % singleArraySize;
int arrayEnd = arrayStart + copyLength;
- tvList.putValues(values[columnIndex][arrayIndex],
bitMap.getRegion(current, current + copyLength), arrayStart, arrayEnd, pos,
singleArraySize);
+ tvList.putValues(
+ values[columnIndex][arrayIndex],
+ bitMap.getRegion(current, current + copyLength),
+ arrayStart,
+ arrayEnd,
+ pos,
+ singleArraySize);
current += copyLength;
}
}
+
+ @Override
+ public void putTo(
+ AlignedTVList tvList,
+ List<Integer> columnIndices,
+ BitMap[] bitMaps,
+ int start,
+ int end,
+ TSStatus[] results,
+ int pos) {
+ if (end > rowLength) {
+ end = rowLength;
+ }
+
+ tvList.resetColumnInsertedMap();
+ for (int i = 0; i < columnIndices.size(); i++) {
+ int current = start;
+ while (current < end) {
+ // put one array to TVList
+ int arrayIndex = current / singleArraySize;
+ int arrayRemaining = singleArraySize - current % singleArraySize;
+ int copyLength = Math.min(arrayRemaining, end - current);
+
+ int arrayStart = current % singleArraySize;
+ int arrayEnd = arrayStart + copyLength;
+ tvList.putAlignedValues(
+ values[i][arrayIndex],
+ columnIndices.get(i),
+ bitMaps[i].getRegion(current, current + copyLength),
+ arrayStart,
+ arrayEnd,
+ results,
+ pos);
+
+ current += copyLength;
+ }
+ }
+ tvList.markNotInsertedColumns(start, end);
+ }
+
+ @Override
+ public Object[] toTwoDArray() {
+ Object[] twoDArray = new Object[values.length];
+ for (int i = 0; i < values.length; i++) {
+ if (dataTypes[i] == null) {
+ continue;
+ }
+ switch (dataTypes[i]) {
+ case INT32:
+ case DATE:
+ twoDArray[i] = new int[rowLength];
+ break;
+ case INT64:
+ case TIMESTAMP:
+ twoDArray[i] = new Long[rowLength];
+ break;
+ case FLOAT:
+ twoDArray[i] = new Float[rowLength];
+ break;
+ case DOUBLE:
+ twoDArray[i] = new double[rowLength];
+ break;
+ case STRING:
+ case BLOB:
+ case TEXT:
+ twoDArray[i] = new Binary[rowLength];
+ break;
+ case BOOLEAN:
+ twoDArray[i] = new Boolean[rowLength];
+ break;
+ case UNKNOWN:
+ case VECTOR:
+ default:
+ throw new UnSupportedDataTypeException(dataTypes[i].toString());
+ }
+ int arrayIndex = 0;
+ for (; arrayIndex < values[i].length - 1; arrayIndex++) {
+ System.arraycopy(
+ values[i][arrayIndex],
+ 0,
+ twoDArray[i],
+ arrayIndex * singleArraySize,
+ singleArraySize);
+ }
+ System.arraycopy(
+ values[i][arrayIndex],
+ 0,
+ twoDArray[i],
+ arrayIndex * singleArraySize,
+ rowLength % singleArraySize);
+ }
+ return twoDArray;
+ }
}
public static class ColumnMappedValueView implements ValueView {
@@ -1647,8 +1859,8 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
}
@Override
- public void copyTo(ValueView valueView, int colIndex, int thisFrom, int
targetFrom,
- int copyLength) {
+ public void copyTo(
+ ValueView valueView, int colIndex, int thisFrom, int targetFrom, int
copyLength) {
innerValue.copyTo(valueView, colIndex, thisFrom, targetFrom, copyLength);
}
@@ -1690,9 +1902,11 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
@Override
public long ramSize(MeasurementSchema[] measurementSchemas) {
if (innerValue instanceof TwoDArrayValueView) {
- return InsertNodeMemoryEstimator.sizeOfColumns(((TwoDArrayValueView)
innerValue).values, measurementSchemas, realIndexes);
+ return InsertNodeMemoryEstimator.sizeOfColumns(
+ ((TwoDArrayValueView) innerValue).values, measurementSchemas,
realIndexes);
} else if (innerValue instanceof ThreeDArrayValueView) {
- return InsertNodeMemoryEstimator.sizeOfColumns(((ThreeDArrayValueView)
innerValue).values, measurementSchemas, realIndexes);
+ return InsertNodeMemoryEstimator.sizeOfColumns(
+ ((ThreeDArrayValueView) innerValue).values, measurementSchemas,
realIndexes);
} else {
return innerValue.ramSize(measurementSchemas);
}
@@ -1712,5 +1926,45 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
public void putTo(TVList tvList, int columnIndex, BitMap bitMap, int
start, int end, int pos) {
innerValue.putTo(tvList, realIndexes.get(columnIndex), bitMap, start,
end, pos);
}
+
+ @Override
+ public void putTo(
+ AlignedTVList tvList,
+ List<Integer> tvListColumnIndices,
+ BitMap[] bitMaps,
+ int start,
+ int end,
+ TSStatus[] results,
+ int pos) {
+ List<Integer> columnIndicesForInner = new
ArrayList<>(innerValue.colLength());
+ // put tvListColumnIndices into the associated places for the inner
columns
+ // if tvListColumnIndices = [0, 1, 2], realIndices = [1, 3, 4],
innerValue.colLength = 5
+ // then columnIndicesForInner = [-1, 0, -1, 1, 2]
+ for (int i = 0; i < innerValue.colLength(); i++) {
+ columnIndicesForInner.add(-1);
+ }
+ for (int i = 0; i < tvListColumnIndices.size(); i++) {
+ columnIndicesForInner.set(realIndexes.get(i),
tvListColumnIndices.get(i));
+ }
+ // similarly for bitmaps
+ BitMap[] bitmapsForInner = new BitMap[innerValue.colLength()];
+ for (int i = 0, realIndexesSize = realIndexes.size(); i <
realIndexesSize; i++) {
+ Integer realIndex = realIndexes.get(i);
+ bitmapsForInner[realIndex] = bitMaps[i];
+ }
+
+ innerValue.putTo(tvList, columnIndicesForInner, bitmapsForInner, start,
end, results, pos);
+ }
+
+ @Override
+ public Object[] toTwoDArray() {
+ Object[] twoDArrayInner = innerValue.toTwoDArray();
+ Object[] twoDArray = new Object[realIndexes.size()];
+ for (int i = 0; i < realIndexes.size(); i++) {
+ twoDArray[i] = twoDArrayInner[realIndexes.get(i)];
+ }
+
+ return twoDArray;
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 287939bedea..a43a3a9b028 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1106,7 +1106,7 @@ public class DataRegion implements IDataRegionForQuery {
Map<Long, List<int[]>[]> splitInfo) {
// before is first start point
int before = loc;
- long beforeTime = insertTabletNode.getTimes()[before];
+ long beforeTime = insertTabletNode.getTimes().get(before);
// before time partition
long beforeTimePartition =
TimePartitionUtils.getTimePartitionId(beforeTime);
// init flush time map
@@ -1115,7 +1115,7 @@ public class DataRegion implements IDataRegionForQuery {
// if is sequence
boolean isSequence = false;
while (loc < endOffset) {
- long time = insertTabletNode.getTimes()[loc];
+ long time = insertTabletNode.getTimes().get(loc);
final long timePartitionId = TimePartitionUtils.getTimePartitionId(time);
long lastFlushTime;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
index e3d54892955..7a5529ce876 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
@@ -152,7 +152,13 @@ public abstract class AbstractWritableMemChunk implements
IWritableMemChunk {
@Override
public abstract void putAlignedTablet(
- TimeView t, ValueView v, List<Integer> columnIndices, BitMap[] bitMaps,
int start, int end, TSStatus[] results);
+ TimeView t,
+ ValueView v,
+ List<Integer> columnIndices,
+ BitMap[] bitMaps,
+ int start,
+ int end,
+ TSStatus[] results);
@Override
public abstract void writeNonAlignedPoint(long insertTime, Object
objectValue);
@@ -163,7 +169,13 @@ public abstract class AbstractWritableMemChunk implements
IWritableMemChunk {
@Override
public abstract void writeNonAlignedTablet(
- TimeView times, ValueView values, int columnIndex, BitMap bitMap,
TSDataType dataType, int start, int end);
+ TimeView times,
+ ValueView values,
+ int columnIndex,
+ BitMap bitMap,
+ TSDataType dataType,
+ int start,
+ int end);
@Override
public abstract void writeAlignedTablet(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index a587cecfee6..9b726718c23 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -49,7 +49,6 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -179,10 +178,16 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
@Override
public void putAlignedTablet(
- TimeView t, ValueView v, List<Integer> columnIndices, BitMap[] bitMaps,
int start, int end, TSStatus[] results) {
+ TimeView t,
+ ValueView v,
+ List<Integer> columnIndices,
+ BitMap[] bitMaps,
+ int start,
+ int end,
+ TSStatus[] results) {
int currentRowCount = list.rowCount();
t.putTo(list, null, start, end);
- v.putTo(list, columnIndices, bitMaps, start, end);
+ v.putTo(list, columnIndices, bitMaps, start, end, results,
currentRowCount);
}
@Override
@@ -192,7 +197,13 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
@Override
public void writeNonAlignedTablet(
- TimeView times, ValueView values, int columnIndex, BitMap bitMap,
TSDataType dataType, int start, int end) {
+ TimeView times,
+ ValueView values,
+ int columnIndex,
+ BitMap bitMap,
+ TSDataType dataType,
+ int start,
+ int end) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
TSDataType.VECTOR);
}
@@ -224,7 +235,7 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
int end,
TSStatus[] results) {
List<Integer> columnIndices = calculateColumnIndices(schemaList);
- putAlignedTablet(times, valueList, bitMaps, columnIndices, start, end,
results);
+ putAlignedTablet(times, valueList, columnIndices, bitMaps, start, end,
results);
if (TVLIST_SORT_THRESHOLD > 0 && list.rowCount() >= TVLIST_SORT_THRESHOLD)
{
handoverAlignedTvList();
}
@@ -786,52 +797,6 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
return columnIndexList;
}
- /**
- * Check metadata of columns and return array that mapping existed metadata
to index of data
- * column.
- *
- * @param schemaListInInsertPlan Contains all existed schema in InsertPlan.
If some timeseries
- * have been deleted, there will be null in its slot.
- * @return columnIndexArray: schemaList[i] is schema of
columns[columnIndexArray[i]]
- */
- private Pair<Object[], BitMap[]> checkAndReorderColumnValuesInInsertPlan(
- List<IMeasurementSchema> schemaListInInsertPlan, ValueView columnValues,
BitMap[] bitMaps) {
- Object[] reorderedColumnValues = new Object[schemaList.size()];
- BitMap[] reorderedBitMaps = bitMaps == null ? null : new
BitMap[schemaList.size()];
- for (int i = 0; i < schemaListInInsertPlan.size(); i++) {
- IMeasurementSchema measurementSchema = schemaListInInsertPlan.get(i);
- if (measurementSchema != null) {
- Integer index =
this.measurementIndexMap.get(measurementSchema.getMeasurementName());
- // Index is null means this measurement was not in this AlignedTVList
before.
- // We need to extend a new column in AlignedMemChunk and AlignedTVList.
- // And the reorderedColumnValues should extend one more column for the
new measurement
- if (index == null) {
- index =
- measurementIndexMap.isEmpty()
- ? 0
- : measurementIndexMap.values().stream()
- .mapToInt(Integer::intValue)
- .max()
- .getAsInt()
- + 1;
-
this.measurementIndexMap.put(schemaListInInsertPlan.get(i).getMeasurementName(),
index);
- this.schemaList.add(schemaListInInsertPlan.get(i));
- this.list.extendColumn(schemaListInInsertPlan.get(i).getType());
- reorderedColumnValues =
- Arrays.copyOf(reorderedColumnValues,
reorderedColumnValues.length + 1);
- if (reorderedBitMaps != null) {
- reorderedBitMaps = Arrays.copyOf(reorderedBitMaps,
reorderedBitMaps.length + 1);
- }
- }
- reorderedColumnValues[index] = columnValues[i];
- if (bitMaps != null) {
- reorderedBitMaps[index] = bitMaps[i];
- }
- }
- }
- return new Pair<>(reorderedColumnValues, reorderedBitMaps);
- }
-
private List<Integer> calculateColumnIndices(List<IMeasurementSchema>
schemaListInInsertPlan) {
List<Integer> columnIndexList = new ArrayList<>();
for (IMeasurementSchema measurementSchema : schemaListInInsertPlan) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index 2ee6cfe4847..ea1766627a9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -61,7 +61,13 @@ public interface IWritableMemChunk extends WALEntryValue {
void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end);
void putAlignedTablet(
- TimeView t, ValueView v, List<Integer> columnIndices, BitMap[] bitMaps,
int start, int end, TSStatus[] results);
+ TimeView t,
+ ValueView v,
+ List<Integer> columnIndices,
+ BitMap[] bitMaps,
+ int start,
+ int end,
+ TSStatus[] results);
void writeNonAlignedPoint(long insertTime, Object objectValue);
@@ -73,7 +79,13 @@ public interface IWritableMemChunk extends WALEntryValue {
* subsequent non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5,
null, 5}
*/
void writeNonAlignedTablet(
- TimeView times, ValueView values, int columnIndex, BitMap bitMap,
TSDataType dataType, int start, int end);
+ TimeView times,
+ ValueView values,
+ int columnIndex,
+ BitMap bitMap,
+ TSDataType dataType,
+ int start,
+ int end);
void writeAlignedTablet(
TimeView times,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 295919b025f..ec729788eb0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -125,7 +125,13 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
@Override
public void writeNonAlignedTablet(
- TimeView times, ValueView valueList, int columnIndex, BitMap bitMap,
TSDataType dataType, int start, int end) {
+ TimeView times,
+ ValueView valueList,
+ int columnIndex,
+ BitMap bitMap,
+ TSDataType dataType,
+ int start,
+ int end) {
int currRowIndex = list.rowCount();
times.putTo(list, bitMap, start, end);
valueList.putTo(list, columnIndex, bitMap, start, end, currRowIndex);
@@ -137,8 +143,8 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
@Override
public void writeAlignedTablet(
- long[] times,
- Object[] valueList,
+ TimeView times,
+ ValueView valueList,
BitMap[] bitMaps,
List<IMeasurementSchema> schemaList,
int start,
@@ -214,7 +220,13 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
@Override
public void putAlignedTablet(
- long[] t, Object[] v, BitMap[] bitMaps, int start, int end, TSStatus[]
results) {
+ TimeView t,
+ ValueView v,
+ List<Integer> tvListColumnIndices,
+ BitMap[] bitMaps,
+ int start,
+ int end,
+ TSStatus[] results) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE +
schema.getType());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
index 5a3d2e8e04f..35f4f9efbe9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
import org.apache.iotdb.db.service.metrics.WritingMetrics;
import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index ca02552bc38..a0572f79c1d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -479,8 +479,8 @@ public class WALNode implements IWALNode {
|| WALManager.getInstance().shouldThrottle();
}
- /**.
- * Snapshot or flush one memTable.
+ /**
+ * . Snapshot or flush one memTable.
*
* @return true if snapshot or flush is executed successfully
*/
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
index 221d39cdfcc..15eea6a760f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
@@ -99,7 +99,7 @@ public class TsFilePlanRedoer {
if (node instanceof InsertRowNode) {
minTimeInNode = ((InsertRowNode) node).getTime();
} else {
- minTimeInNode = ((InsertTabletNode) node).getTimes()[0];
+ minTimeInNode = ((InsertTabletNode) node).getTimes().get(0);
}
if (lastEndTime.isPresent() && lastEndTime.get() >= minTimeInNode) {
return;
@@ -123,7 +123,7 @@ public class TsFilePlanRedoer {
IDeviceID deviceID = pair.getLeft();
Optional<Long> endTimeInResource =
tsFileResource == null ? Optional.empty() :
tsFileResource.getEndTime(deviceID);
- long minTimeOfDevice = relationalInsertTabletNode.getTimes()[start];
+ long minTimeOfDevice =
relationalInsertTabletNode.getTimes().get(start);
if (endTimeInResource.isPresent() && endTimeInResource.get() >=
minTimeOfDevice) {
start = pair.getRight();
continue;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatement.java
index 1366b0720c5..0100533cb6b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatement.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.storageengine.load.converter;
-import org.apache.iotdb.db.pipe.receiver.transform.converter.ArrayConverter;
import
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertTabletStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
@@ -54,8 +53,7 @@ public class LoadConvertedInsertTabletStatement extends
PipeConvertedInsertTable
measurements[columnIndex],
dataTypes[columnIndex],
dataType);
- columns[columnIndex] =
- ArrayConverter.convert(dataTypes[columnIndex], dataType,
columns[columnIndex]);
+ columns.castTo(columnIndex, dataType);
dataTypes[columnIndex] = dataType;
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
index d02ff1d1d54..6b30c4f8b66 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
@@ -154,9 +154,9 @@ public class TriggerFireVisitor extends
PlanVisitor<TriggerFireResult, TriggerEv
Map<String, Integer> measurementToSchemaIndexMap =
constructMeasurementToSchemaIndexMap(node.getMeasurements(),
measurementSchemas);
- Object[] columns = node.getColumns();
+ Object[] columns = node.getColumns().toTwoDArray();
BitMap[] bitMaps = node.getBitMaps();
- long[] timestamps = node.getTimes();
+ long[] timestamps = node.getTimes().toSingleArray();
int rowCount = node.getRowCount();
boolean hasFailedTrigger = false;
for (Map.Entry<String, List<String>> entry :
triggerNameToMeasurementList.entrySet()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index d83379bd738..6eab2477d07 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.utils;
-import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -357,8 +356,7 @@ public class CommonUtils {
return dataTTL == Long.MAX_VALUE || (CommonDateTimeUtils.currentTime() -
time) <= dataTTL;
}
- public static Object createValueColumnOfDataType(
- TSDataType dataType, TsTableColumnCategory columnCategory, int rowNum) {
+ public static Object createValueColumnOfDataType(TSDataType dataType, int
rowNum) {
Object valueColumn;
switch (dataType) {
case INT32:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index 700132ed466..b94b01c2f95 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -115,7 +115,8 @@ public class MemUtils {
return memSize;
}
- public static long getBinaryColumnSize(ValueView columns, int columnIndex,
int start, int end, TSStatus[] results) {
+ public static long getBinaryColumnSize(
+ ValueView columns, int columnIndex, int start, int end, TSStatus[]
results) {
long memSize = 0;
memSize += (long) (end - start) *
RamUsageEstimator.NUM_BYTES_OBJECT_HEADER;
for (int i = start; i < end; i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 1293de46cd1..05c5d9145a0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -646,13 +646,17 @@ public class QueryDataSetUtils {
}
public static long[][] readTimesFromBufferWithPam(ByteBuffer buffer, int
size) {
- int numOfArray = size / PrimitiveArrayManager.ARRAY_SIZE + size %
PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0;
+ int numOfArray =
+ size / PrimitiveArrayManager.ARRAY_SIZE + size %
PrimitiveArrayManager.ARRAY_SIZE > 0
+ ? 1
+ : 0;
long[][] times = new long[size][];
for (int i = 0; i < numOfArray; i++) {
times[i] = (long[]) PrimitiveArrayManager.allocate(TSDataType.INT64);
}
for (int i = 0; i < size; i++) {
- times[i / PrimitiveArrayManager.ARRAY_SIZE][i %
PrimitiveArrayManager.ARRAY_SIZE] = buffer.getLong();
+ times[i / PrimitiveArrayManager.ARRAY_SIZE][i %
PrimitiveArrayManager.ARRAY_SIZE] =
+ buffer.getLong();
}
return times;
}
@@ -787,7 +791,10 @@ public class QueryDataSetUtils {
public static Object[][] readTabletValuesFromBufferWithPam(
ByteBuffer buffer, TSDataType[] types, int columns, int size) {
Object[][] values = new Object[columns][];
- int arraySize = size / PrimitiveArrayManager.ARRAY_SIZE + size %
PrimitiveArrayManager.ARRAY_SIZE == 0 ? 0 : 1;
+ int arraySize =
+ size / PrimitiveArrayManager.ARRAY_SIZE + size %
PrimitiveArrayManager.ARRAY_SIZE == 0
+ ? 0
+ : 1;
for (int i = 0; i < columns; i++) {
values[i] = new Object[arraySize];
for (int j = 0; j < arraySize; j++) {
@@ -796,29 +803,39 @@ public class QueryDataSetUtils {
switch (types[i]) {
case BOOLEAN:
for (int index = 0; index < size; index++) {
- ((boolean[]) values[i][index /
PrimitiveArrayManager.ARRAY_SIZE])[index % PrimitiveArrayManager.ARRAY_SIZE] =
BytesUtils.byteToBool(buffer.get());
+ ((boolean[]) values[i][index / PrimitiveArrayManager.ARRAY_SIZE])
+ [index % PrimitiveArrayManager.ARRAY_SIZE] =
+ BytesUtils.byteToBool(buffer.get());
}
break;
case INT32:
case DATE:
for (int index = 0; index < size; index++) {
- ((int[]) values[i][index /
PrimitiveArrayManager.ARRAY_SIZE])[index % PrimitiveArrayManager.ARRAY_SIZE] =
buffer.getInt();
+ ((int[]) values[i][index / PrimitiveArrayManager.ARRAY_SIZE])
+ [index % PrimitiveArrayManager.ARRAY_SIZE] =
+ buffer.getInt();
}
break;
case INT64:
case TIMESTAMP:
for (int index = 0; index < size; index++) {
- ((long[]) values[i][index /
PrimitiveArrayManager.ARRAY_SIZE])[index % PrimitiveArrayManager.ARRAY_SIZE] =
buffer.getLong();
+ ((long[]) values[i][index / PrimitiveArrayManager.ARRAY_SIZE])
+ [index % PrimitiveArrayManager.ARRAY_SIZE] =
+ buffer.getLong();
}
break;
case FLOAT:
for (int index = 0; index < size; index++) {
- ((float[]) values[i][index /
PrimitiveArrayManager.ARRAY_SIZE])[index % PrimitiveArrayManager.ARRAY_SIZE] =
buffer.getFloat();
+ ((float[]) values[i][index / PrimitiveArrayManager.ARRAY_SIZE])
+ [index % PrimitiveArrayManager.ARRAY_SIZE] =
+ buffer.getFloat();
}
break;
case DOUBLE:
for (int index = 0; index < size; index++) {
- ((double[]) values[i][index /
PrimitiveArrayManager.ARRAY_SIZE])[index % PrimitiveArrayManager.ARRAY_SIZE] =
buffer.getDouble();
+ ((double[]) values[i][index / PrimitiveArrayManager.ARRAY_SIZE])
+ [index % PrimitiveArrayManager.ARRAY_SIZE] =
+ buffer.getDouble();
}
break;
case TEXT:
@@ -828,7 +845,9 @@ public class QueryDataSetUtils {
int binarySize = buffer.getInt();
byte[] binaryValue = new byte[binarySize];
buffer.get(binaryValue);
- ((Binary[]) values[i][index /
PrimitiveArrayManager.ARRAY_SIZE])[index % PrimitiveArrayManager.ARRAY_SIZE] =
new Binary(binaryValue);
+ ((Binary[]) values[i][index / PrimitiveArrayManager.ARRAY_SIZE])
+ [index % PrimitiveArrayManager.ARRAY_SIZE] =
+ new Binary(binaryValue);
}
break;
default:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 0b2e7627e93..a4b6f7ba298 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils.datastructure;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
@@ -61,6 +62,7 @@ import static
org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted;
import static org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
import static org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
+@SuppressWarnings("SuspiciousSystemArraycopy")
public abstract class AlignedTVList extends TVList {
// Data types of this aligned tvList
@@ -184,9 +186,25 @@ public abstract class AlignedTVList extends TVList {
return cloneList;
}
+ @TestOnly
+ public synchronized void putAlignedValue(long timestamp, Object[] value) {
+ putAlignedValue(timestamp, value, null);
+ }
+
+ /**
+ * Put a row of aligned values into this AlignedTVList
+ *
+ * @param timestamp the timestamp to put
+ * @param value the values to put
+ * @param columnIndices the association between the input 'value' and the
columns in the
+ * AlignedTVList, i.e., if columnIndices[i] = j, then values[i] should
be put into the j-th
+ * column in the TVList (time column excluded). If null, then values[i]
will be put into the
+ * i-th column. May put -1 in 'columnIndices' to skip some positions in
'values'.
+ */
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
@Override
- public synchronized void putAlignedValue(long timestamp, Object[] value,
List<Integer> columnIndices) {
+ public synchronized void putAlignedValue(
+ long timestamp, Object[] value, List<Integer> columnIndices) {
checkExpansion();
int arrayIndex = rowCount / ARRAY_SIZE;
int elementIndex = rowCount % ARRAY_SIZE;
@@ -194,9 +212,17 @@ public abstract class AlignedTVList extends TVList {
minTime = Math.min(minTime, timestamp);
timestamps.get(arrayIndex)[elementIndex] = timestamp;
+ if (columnIndices == null) {
+ columnIndices = IntStream.range(0,
value.length).boxed().collect(Collectors.toList());
+ }
+
columnInsertedMap.reset();
for (int i = 0, columnIndicesSize = columnIndices.size(); i <
columnIndicesSize; i++) {
Integer columnIndex = columnIndices.get(i);
+ if (columnIndex < 0) {
+ continue;
+ }
+
Object columnValue = value[i];
columnInsertedMap.mark(columnIndex);
@@ -789,66 +815,79 @@ public abstract class AlignedTVList extends TVList {
time, (TsPrimitiveType) getAlignedValueForQuery(index, floatPrecision,
encodingList));
}
- public void putAlignedValues(
- Object[] value, List<Integer> columnIndices, BitMap[] bitMaps, int
start, int end, TSStatus[] results, int tvListPos) {
- checkExpansion();
- int inputIdx = start;
+ public synchronized void putAlignedValues(
+ Object column,
+ int columnIndex,
+ BitMap bitMap,
+ int start,
+ int end,
+ TSStatus[] results,
+ int tvListPos) {
+ if (columnIndex < 0 || column == null) {
+ // non-exist/invalid column
+ return;
+ }
+
+ columnInsertedMap.mark(columnIndex);
- while (inputIdx < end) {
- int inputRemaining = end - inputIdx;
+ int current = start;
+
+ while (current < end) {
+ int inputRemaining = end - current;
int arrayIdx = tvListPos / ARRAY_SIZE;
int elementIdx = tvListPos % ARRAY_SIZE;
int internalRemaining = ARRAY_SIZE - elementIdx;
- if (internalRemaining >= inputRemaining) {
- // the remaining inputs can fit the last array, copy all remaining
inputs into last array
- arrayCopy(value, columnIndices, inputIdx, arrayIdx, elementIdx,
inputRemaining);
- for (int i = 0; i < inputRemaining; i++) {
- for (int valueIndex = 0; valueIndex < columnIndices.size();
valueIndex++) {
- Integer columnIndex = columnIndices.get(valueIndex);
- if (value[valueIndex] == null
- || bitMaps != null && bitMaps[valueIndex] != null &&
bitMaps[valueIndex].isMarked(inputIdx + i)
- || results != null
- && results[inputIdx + i] != null
- && results[inputIdx + i].code !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- markNullValue(columnIndex, arrayIdx, elementIdx + i);
- }
- }
- for (int columnIndex = 0; columnIndex < dataTypes.size();
columnIndex++) {
- if (!columnInsertedMap.isMarked(columnIndex)) {
- markNullValue(columnIndex, arrayIdx, elementIdx + i);
- }
- }
+ int copyLength = Math.min(internalRemaining, inputRemaining);
+ // the remaining inputs can fit the last array, copy all remaining
inputs into last array
+ arrayCopy(column, columnIndex, current, arrayIdx, elementIdx,
copyLength);
+ for (int i = 0; i < copyLength; i++) {
+ if (bitMap.isMarked(current + i)
+ || results != null
+ && results[current + i] != null
+ && results[current + i].code !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ markNullValue(columnIndex, arrayIdx, elementIdx + i);
}
- break;
- } else {
- // the remaining inputs cannot fit the last array, fill the last array
and create a new
- // one and enter the next loop
- arrayCopy(value, inputIdx, arrayIdx, elementIdx, internalRemaining);
- for (int i = 0; i < internalRemaining; i++) {
- for (int valueIndex = 0; valueIndex < columnIndices.size();
valueIndex++) {
- Integer columnIndex = columnIndices.get(valueIndex);
- if (value[valueIndex] == null
- || bitMaps != null && bitMaps[valueIndex] != null &&
bitMaps[valueIndex].isMarked(inputIdx + i)
- || results != null
- && results[inputIdx + i] != null
- && results[inputIdx + i].code !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- markNullValue(columnIndex, arrayIdx, elementIdx + i);
- }
- }
+ }
- for (int columnIndex = 0; columnIndex < dataTypes.size();
columnIndex++) {
- if (!columnInsertedMap.isMarked(columnIndex)) {
- markNullValue(columnIndex, arrayIdx, elementIdx + i);
- }
- }
+ current += internalRemaining;
+ tvListPos += internalRemaining;
+ }
+ }
+
+ public void resetColumnInsertedMap() {
+ columnInsertedMap.reset();
+ }
+
+ public void markNotInsertedColumns(int start, int end) {
+ for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++) {
+ if (!columnInsertedMap.isMarked(columnIndex)) {
+ for (int i = start; i < end; i++) {
+ int arrayIdx = i / ARRAY_SIZE;
+ int elementIdx = i % ARRAY_SIZE;
+ markNullValue(columnIndex, arrayIdx, elementIdx);
}
- inputIdx += internalRemaining;
- tvListPos += internalRemaining;
}
}
}
+ public synchronized void putAlignedValues(
+ Object[] columns,
+ List<Integer> columnIndices,
+ BitMap[] bitMaps,
+ int start,
+ int end,
+ TSStatus[] results,
+ int tvListPos) {
+ columnInsertedMap.reset();
+
+ for (int i = 0; i < columnIndices.size(); i++) {
+ putAlignedValues(
+ columns[i], columnIndices.get(i), bitMaps[i], start, end, results,
tvListPos);
+ }
+ markNotInsertedColumns(start, end);
+ }
+
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
@Override
public synchronized void putAlignedValues(
@@ -909,12 +948,61 @@ public abstract class AlignedTVList extends TVList {
}
}
- private void arrayCopy(Object[] value, List<Integer> columnIndices, int idx,
int arrayIndex, int elementIndex, int remaining) {
+ private void arrayCopy(
+ Object column, int columnIndex, int idx, int arrayIndex, int
elementIndex, int remaining) {
+ List<Object> columnValues = values.get(columnIndex);
+ switch (dataTypes.get(columnIndex)) {
+ case TEXT:
+ case BLOB:
+ case STRING:
+ Binary[] arrayT = ((Binary[]) columnValues.get(arrayIndex));
+ System.arraycopy(column, idx, arrayT, elementIndex, remaining);
+
+ // update raw size of Text chunk
+ for (int i1 = 0; i1 < remaining; i1++) {
+ memoryBinaryChunkSize[columnIndex] +=
+ arrayT[elementIndex + i1] != null ?
getBinarySize(arrayT[elementIndex + i1]) : 0;
+ }
+ break;
+ case FLOAT:
+ float[] arrayF = ((float[]) columnValues.get(arrayIndex));
+ System.arraycopy(column, idx, arrayF, elementIndex, remaining);
+ break;
+ case INT32:
+ case DATE:
+ int[] arrayI = ((int[]) columnValues.get(arrayIndex));
+ System.arraycopy(column, idx, arrayI, elementIndex, remaining);
+ break;
+ case INT64:
+ case TIMESTAMP:
+ long[] arrayL = ((long[]) columnValues.get(arrayIndex));
+ System.arraycopy(column, idx, arrayL, elementIndex, remaining);
+ break;
+ case DOUBLE:
+ double[] arrayD = ((double[]) columnValues.get(arrayIndex));
+ System.arraycopy(column, idx, arrayD, elementIndex, remaining);
+ break;
+ case BOOLEAN:
+ boolean[] arrayB = ((boolean[]) columnValues.get(arrayIndex));
+ System.arraycopy(column, idx, arrayB, elementIndex, remaining);
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void arrayCopy(
+ Object[] columns,
+ List<Integer> columnIndices,
+ int idx,
+ int arrayIndex,
+ int elementIndex,
+ int remaining) {
columnInsertedMap.reset();
for (int i = 0, columnIndicesSize = columnIndices.size(); i <
columnIndicesSize; i++) {
Integer columnIndex = columnIndices.get(i);
columnInsertedMap.mark(columnIndex);
- if (value[i] == null) {
+ if (columns[i] == null) {
continue;
}
@@ -924,7 +1012,7 @@ public abstract class AlignedTVList extends TVList {
case BLOB:
case STRING:
Binary[] arrayT = ((Binary[]) columnValues.get(arrayIndex));
- System.arraycopy(value[i], idx, arrayT, elementIndex, remaining);
+ System.arraycopy(columns[i], idx, arrayT, elementIndex, remaining);
// update raw size of Text chunk
for (int i1 = 0; i1 < remaining; i1++) {
@@ -934,25 +1022,25 @@ public abstract class AlignedTVList extends TVList {
break;
case FLOAT:
float[] arrayF = ((float[]) columnValues.get(arrayIndex));
- System.arraycopy(value[i], idx, arrayF, elementIndex, remaining);
+ System.arraycopy(columns[i], idx, arrayF, elementIndex, remaining);
break;
case INT32:
case DATE:
int[] arrayI = ((int[]) columnValues.get(arrayIndex));
- System.arraycopy(value[i], idx, arrayI, elementIndex, remaining);
+ System.arraycopy(columns[i], idx, arrayI, elementIndex, remaining);
break;
case INT64:
case TIMESTAMP:
long[] arrayL = ((long[]) columnValues.get(arrayIndex));
- System.arraycopy(value[i], idx, arrayL, elementIndex, remaining);
+ System.arraycopy(columns[i], idx, arrayL, elementIndex, remaining);
break;
case DOUBLE:
double[] arrayD = ((double[]) columnValues.get(arrayIndex));
- System.arraycopy(value[i], idx, arrayD, elementIndex, remaining);
+ System.arraycopy(columns[i], idx, arrayD, elementIndex, remaining);
break;
case BOOLEAN:
boolean[] arrayB = ((boolean[]) columnValues.get(arrayIndex));
- System.arraycopy(value[i], idx, arrayB, elementIndex, remaining);
+ System.arraycopy(columns[i], idx, arrayB, elementIndex, remaining);
break;
default:
break;
@@ -2102,4 +2190,9 @@ public abstract class AlignedTVList extends TVList {
return outer.isNullValue(valueIndex, validColumnIndex);
}
}
+
+ @Override
+ protected Object getValueArray(int arrayIndex) {
+ throw new UnsupportedOperationException("AlignedTVList does not support
getValueArray");
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 98056a441b4..dcd8a2ae844 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.db.utils.datastructure;
-import java.lang.reflect.Array;
-import java.util.stream.IntStream;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
@@ -41,6 +39,7 @@ import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataInputStream;
import java.io.IOException;
+import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -49,6 +48,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.IntStream;
import static
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE;
import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted;
@@ -301,8 +301,7 @@ public abstract class TVList implements WALEntryValue {
time = clonedTime;
timeIdxOffset = start;
// drop null at the end of value array
- int nullCnt =
- dropNullThenUpdateMinMaxTimeAndSorted(time, bitMap, start, end,
timeIdxOffset);
+ int nullCnt = dropNullThenUpdateMinMaxTimeAndSorted(time, bitMap, start,
end, timeIdxOffset);
end -= nullCnt;
} else {
updateMinMaxTimeAndSorted(time, start, end);
@@ -353,7 +352,8 @@ public abstract class TVList implements WALEntryValue {
} else if (valueValueArray instanceof Binary[]) {
return new Binary[valueValueArrayLength];
} else {
- throw new IllegalArgumentException("Unsupported array type: " +
valueValueArray.getClass().getName());
+ throw new IllegalArgumentException(
+ "Unsupported array type: " + valueValueArray.getClass().getName());
}
}
@@ -369,8 +369,7 @@ public abstract class TVList implements WALEntryValue {
System.arraycopy(valueArray, 0, clonedValue, 0, valueArrayLength);
valueArray = clonedValue;
// drop null at the end of value array
- int nullCnt =
- dropNullVal(valueArray, bitMap, start, end);
+ int nullCnt = dropNullVal(valueArray, bitMap, start, end);
end -= nullCnt;
}
@@ -427,7 +426,7 @@ public abstract class TVList implements WALEntryValue {
if (sorted
&& (rowCount == 0
- || (end - start > nullCnt) && time[start - tIdxOffset] >=
getTime(rowCount - 1))) {
+ || (end - start > nullCnt) && time[start - tIdxOffset] >=
getTime(rowCount - 1))) {
seqRowCount += inputSeqRowCount;
}
sorted = sorted && inputSorted && (rowCount == 0 || inPutMinTime >=
getTime(rowCount - 1));
@@ -435,8 +434,7 @@ public abstract class TVList implements WALEntryValue {
}
// move null values to the end of value array, then return number of null
values
- int dropNullVal(
- Object values, BitMap bitMap, int start, int end) {
+ int dropNullVal(Object values, BitMap bitMap, int start, int end) {
int nullCnt = 0;
for (int vIdx = start; vIdx < end; vIdx++) {
if (bitMap.isMarked(vIdx)) {
@@ -451,7 +449,6 @@ public abstract class TVList implements WALEntryValue {
return nullCnt;
}
-
public void putLong(long time, long value) {
throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
@@ -505,7 +502,13 @@ public abstract class TVList implements WALEntryValue {
}
public void putAlignedValues(
- Object[] value, List<Integer> columnIndices, BitMap[] bitMaps, int
start, int end, TSStatus[] results, int pos) {
+ Object[] value,
+ List<Integer> columnIndices,
+ BitMap[] bitMaps,
+ int start,
+ int end,
+ TSStatus[] results,
+ int pos) {
throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java
index 5bb36669f70..e5942cabe07 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java
@@ -107,7 +107,6 @@ import java.util.stream.Collectors;
import static
org.apache.iotdb.db.schemaengine.template.TemplateQueryType.SHOW_MEASUREMENTS;
import static org.apache.tsfile.file.metadata.enums.CompressionType.SNAPPY;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -260,9 +259,8 @@ public class StatementGeneratorTest {
insertTabletStatement.getDataType(insertPos));
assertEquals(
columnSchema.getColumnCategory(),
insertTabletStatement.getColumnCategories()[insertPos]);
- final Object[] column1 = (Object[])
insertTabletStatement.getColumns()[insertPos];
- for (Object o : column1) {
- assertNull(o);
+ for (int i = 0; i < insertTabletStatement.getRowCount(); i++) {
+ assertNull(insertTabletStatement.getColumns().get(i, insertPos));
}
// insert at middle
@@ -278,9 +276,8 @@ public class StatementGeneratorTest {
insertTabletStatement.getDataType(insertPos));
assertEquals(
columnSchema.getColumnCategory(),
insertTabletStatement.getColumnCategories()[insertPos]);
- final long[] column2 = (long[])
insertTabletStatement.getColumns()[insertPos];
- for (long o : column2) {
- assertEquals(0, o);
+ for (int i = 0; i < insertTabletStatement.getRowCount(); i++) {
+ assertEquals(0, insertTabletStatement.getColumns().get(i, insertPos));
}
// insert at last
@@ -296,9 +293,8 @@ public class StatementGeneratorTest {
insertTabletStatement.getDataType(insertPos));
assertEquals(
columnSchema.getColumnCategory(),
insertTabletStatement.getColumnCategories()[insertPos]);
- final boolean[] column3 = (boolean[])
insertTabletStatement.getColumns()[insertPos];
- for (boolean o : column3) {
- assertFalse(o);
+ for (int i = 0; i < insertTabletStatement.getRowCount(); i++) {
+ assertFalse((Boolean) insertTabletStatement.getColumns().get(i,
insertPos));
}
// illegal insertion
@@ -336,9 +332,13 @@ public class StatementGeneratorTest {
assertEquals(tsDataTypes[2], insertTabletStatement.getDataType(0));
assertEquals(columnCategories[0],
insertTabletStatement.getColumnCategories()[2]);
assertEquals(columnCategories[2],
insertTabletStatement.getColumnCategories()[0]);
- assertArrayEquals(
- ((double[]) columns[2]), ((double[])
insertTabletStatement.getColumns()[0]), 0.0001);
- assertArrayEquals(((Binary[]) columns[0]), ((Binary[])
insertTabletStatement.getColumns()[2]));
+ for (int i = 0; i < insertTabletStatement.getRowCount(); i++) {
+ assertEquals(
+ ((double[]) columns[2])[i],
+ (double) insertTabletStatement.getColumns().get(i, 0),
+ 0.0001);
+ assertEquals(((Binary[]) columns[0])[i],
insertTabletStatement.getColumns().get(i, 2));
+ }
assertTrue(insertTabletStatement.getBitMaps()[0].isMarked(2));
assertTrue(insertTabletStatement.getBitMaps()[2].isMarked(0));
@@ -351,9 +351,13 @@ public class StatementGeneratorTest {
assertEquals(tsDataTypes[2], insertTabletStatement.getDataType(1));
assertEquals(columnCategories[1],
insertTabletStatement.getColumnCategories()[0]);
assertEquals(columnCategories[2],
insertTabletStatement.getColumnCategories()[1]);
- assertArrayEquals(((Binary[]) columns[1]), ((Binary[])
insertTabletStatement.getColumns()[0]));
- assertArrayEquals(
- ((double[]) columns[2]), ((double[])
insertTabletStatement.getColumns()[1]), 0.0001);
+ for (int i = 0; i < insertTabletStatement.getRowCount(); i++) {
+ assertEquals(
+ ((double[]) columns[2])[i],
+ (double) insertTabletStatement.getColumns().get(i, 1),
+ 0.0001);
+ assertEquals(((Binary[]) columns[1])[i], ((Binary[]) columns[0])[i]);
+ }
assertTrue(insertTabletStatement.getBitMaps()[0].isMarked(1));
assertTrue(insertTabletStatement.getBitMaps()[1].isMarked(2));
@@ -366,9 +370,13 @@ public class StatementGeneratorTest {
assertEquals(tsDataTypes[2], insertTabletStatement.getDataType(1));
assertEquals(columnCategories[1],
insertTabletStatement.getColumnCategories()[0]);
assertEquals(columnCategories[2],
insertTabletStatement.getColumnCategories()[1]);
- assertArrayEquals(((Binary[]) columns[1]), ((Binary[])
insertTabletStatement.getColumns()[0]));
- assertArrayEquals(
- ((double[]) columns[2]), ((double[])
insertTabletStatement.getColumns()[1]), 0.0001);
+ for (int i = 0; i < insertTabletStatement.getRowCount(); i++) {
+ assertEquals(
+ ((double[]) columns[2])[i],
+ (double) insertTabletStatement.getColumns().get(i, 1),
+ 0.0001);
+ assertEquals(((Binary[]) columns[1])[i], ((Binary[]) columns[0])[i]);
+ }
assertTrue(insertTabletStatement.getBitMaps()[0].isMarked(1));
assertTrue(insertTabletStatement.getBitMaps()[1].isMarked(2));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
index 807186c26da..88873fc98c1 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
@@ -220,7 +220,7 @@ public class WritePlanNodeSplitTest {
Assert.assertEquals(6, insertTabletNodeList.size());
for (WritePlanNode insertNode : insertTabletNodeList) {
InsertTabletNode tabletNode = (InsertTabletNode) insertNode;
- Assert.assertEquals(tabletNode.getTimes().length, 2);
+ Assert.assertEquals(tabletNode.getTimes().length(), 2);
TConsensusGroupId regionId =
tabletNode.getDataRegionReplicaSet().getRegionId();
Assert.assertEquals(getRegionIdByTime(tabletNode.getMinTime()),
regionId.getId());
}
@@ -245,7 +245,7 @@ public class WritePlanNodeSplitTest {
Assert.assertEquals(1, insertTabletNodeList.size());
for (WritePlanNode insertNode : insertTabletNodeList) {
- Assert.assertEquals(((InsertTabletNode) insertNode).getTimes().length,
10);
+ Assert.assertEquals(((InsertTabletNode) insertNode).getTimes().length(),
10);
}
}
@@ -294,9 +294,9 @@ public class WritePlanNodeSplitTest {
Assert.assertEquals(6, insertTabletNodeList.size());
for (WritePlanNode insertNode : insertTabletNodeList) {
InsertTabletNode tabletNode = (InsertTabletNode) insertNode;
- Assert.assertEquals(2, tabletNode.getTimes().length);
+ Assert.assertEquals(2, tabletNode.getTimes().length());
// keep the time order after split
- Assert.assertTrue(tabletNode.getTimes()[0] < tabletNode.getTimes()[1]);
+ Assert.assertTrue(tabletNode.getTimes().get(0) <
tabletNode.getTimes().get(1));
TConsensusGroupId regionId =
tabletNode.getDataRegionReplicaSet().getRegionId();
Assert.assertEquals(getRegionIdByTime(tabletNode.getMinTime()),
regionId.getId());
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
index 3f1df86076a..1c1aba4ffdc 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
@@ -1155,8 +1155,9 @@ public class AnalyzerTest {
// attr column should be removed
columns = new Object[] {columns[0], columns[2]};
- assertArrayEquals(columns, insertTabletNode.getColumns());
- assertArrayEquals(StatementTestUtils.genTimestamps(),
insertTabletNode.getTimes());
+ assertArrayEquals(columns, insertTabletNode.getColumns().toTwoDArray());
+ assertArrayEquals(
+ StatementTestUtils.genTimestamps(),
insertTabletNode.getTimes().toSingleArray());
distributionPlanner =
new TableDistributedPlanner(