This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch use_pam_for_insert_tablet
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/use_pam_for_insert_tablet by
this push:
new 3641826e1e0 multiple fixes
3641826e1e0 is described below
commit 3641826e1e04601e5d9ca503fef8d760fb546dea
Author: Tian Jiang <[email protected]>
AuthorDate: Wed Apr 2 11:12:59 2025 +0800
multiple fixes
---
.../it/session/IoTDBSessionRelationalIT.java | 5 ++
.../iotdb/db/protocol/mqtt/MPPPublishHandler.java | 3 +-
.../v1/handler/StatementConstructionHandler.java | 3 +-
.../v1/handler/StatementConstructionHandler.java | 3 +-
.../v2/handler/StatementConstructionHandler.java | 3 +-
.../execution/executor/RegionWriteExecutor.java | 3 +
.../operator/process/AbstractIntoOperator.java | 3 +-
.../plan/parser/StatementGenerator.java | 8 +--
.../planner/plan/node/write/InsertTabletNode.java | 48 ++++++++------
.../plan/statement/crud/InsertTabletStatement.java | 75 +++++++++++-----------
.../memtable/AlignedWritableMemChunk.java | 9 +--
.../rescon/memory/PrimitiveArrayManager.java | 4 --
.../java/org/apache/iotdb/db/utils/MemUtils.java | 3 +-
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 12 ++--
.../db/utils/datastructure/AlignedTVList.java | 6 +-
15 files changed, 103 insertions(+), 85 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
index 7d89b5efee6..464267a9231 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.isession.ITableSession;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.env.remote.env.RemoteServerEnv;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.TableClusterIT;
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
@@ -49,6 +50,7 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -1848,6 +1850,9 @@ public class IoTDBSessionRelationalIT {
"INSERT INTO remove_attr_col (time, tag1, attr1, m1) VALUES (10,
'tag:1', 'attr:10', 10.0)");
// check WAL
+ if (EnvFactory.getEnv() instanceof RemoteServerEnv) {
+ return;
+ }
for (DataNodeWrapper dataNodeWrapper :
EnvFactory.getEnv().getDataNodeWrapperList()) {
String walNodeDir = dataNodeWrapper.getWalDir() + File.separator + "0";
File[] walFiles = new File(walNodeDir).listFiles(f ->
f.getName().endsWith(".wal"));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index a1e652cbd5a..bf8b17c6823 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -259,7 +259,8 @@ public class MPPPublishHandler extends
AbstractInterceptHandler {
columnCategories[i] = TsTableColumnCategory.ATTRIBUTE;
}
insertStatement.setDataTypes(dataTypes);
- insertStatement.setColumns(new TwoDArrayValueView(columns, dataTypes,
rowSize));
+ insertStatement.setColumns(
+ new TwoDArrayValueView(columns, insertStatement::getDataTypes,
rowSize));
insertStatement.setColumnCategories(columnCategories);
return insertStatement;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/StatementConstructionHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/StatementConstructionHandler.java
index 5d993e5428f..25856242aff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/StatementConstructionHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/handler/StatementConstructionHandler.java
@@ -169,7 +169,8 @@ public class StatementConstructionHandler {
throw new IllegalArgumentException("Invalid input: " +
rawDataType.get(columnIndex));
}
}
- insertStatement.setColumns(new TwoDArrayValueView(columns, dataTypes,
rowSize));
+ insertStatement.setColumns(
+ new TwoDArrayValueView(columns, insertStatement::getDataTypes,
rowSize));
insertStatement.setBitMaps(bitMaps);
insertStatement.setRowCount(rowSize);
insertStatement.setDataTypes(dataTypes);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/handler/StatementConstructionHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/handler/StatementConstructionHandler.java
index 61d4c7dc406..04fe0c045cd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/handler/StatementConstructionHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/handler/StatementConstructionHandler.java
@@ -173,7 +173,8 @@ public class StatementConstructionHandler {
insertStatement.setBitMaps(bitMaps);
insertStatement.setRowCount(insertTabletRequest.getTimestamps().size());
insertStatement.setDataTypes(dataTypes);
- insertStatement.setColumns(new TwoDArrayValueView(columns, dataTypes,
rowSize));
+ insertStatement.setColumns(
+ new TwoDArrayValueView(columns, insertStatement::getDataTypes,
rowSize));
insertStatement.setAligned(insertTabletRequest.getIsAligned());
return insertStatement;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/StatementConstructionHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/StatementConstructionHandler.java
index f9e5f1bb604..890aeb15cad 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/StatementConstructionHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/StatementConstructionHandler.java
@@ -183,7 +183,8 @@ public class StatementConstructionHandler {
insertStatement.setBitMaps(bitMaps);
insertStatement.setRowCount(insertTabletRequest.getTimestamps().size());
insertStatement.setDataTypes(dataTypes);
- insertStatement.setColumns(new TwoDArrayValueView(columns, dataTypes,
rowSize));
+ insertStatement.setColumns(
+ new TwoDArrayValueView(columns, insertStatement::getDataTypes,
rowSize));
insertStatement.setAligned(insertTabletRequest.getIsAligned());
return insertStatement;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index fc3013b6240..2e3c5a742c8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -324,6 +324,9 @@ public class RegionWriteExecutor {
}
triggerCostTime += (System.nanoTime() - startTime);
}
+ if (insertNode instanceof InsertTabletNode) {
+ ((InsertTabletNode) insertNode).decRefCount();
+ }
PERFORMANCE_OVERVIEW_METRICS.recordScheduleTriggerCost(triggerCostTime);
return status;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
index 15afe24a90c..3423326b499 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractIntoOperator.java
@@ -542,7 +542,8 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
insertTabletStatement.setTimes(new SingleArrayTimeView(times));
insertTabletStatement.setBitMaps(bitMaps);
- insertTabletStatement.setColumns(new TwoDArrayValueView(columns,
dataTypes, rowCount));
+ insertTabletStatement.setColumns(
+ new TwoDArrayValueView(columns, insertTabletStatement::getDataTypes,
rowCount));
return insertTabletStatement;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
index 50cddde149a..898196361f8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
@@ -348,15 +348,15 @@ public class StatementGenerator {
new TwoDArrayValueView(
QueryDataSetUtils.readTabletValuesFromBuffer(
valueBuffer, dataTypes, dataTypes.length, rowSize),
- dataTypes,
+ insertTabletStatement::getDataTypes,
rowSize));
} else {
long[][] timestamps =
QueryDataSetUtils.readTimesFromBufferWithPam(timeBuffer, rowSize);
if (timestamps.length != 0) {
TimestampPrecisionUtils.checkTimestampPrecision(timestamps[0][0]);
TimestampPrecisionUtils.checkTimestampPrecision(
- timestamps[rowSize / PrimitiveArrayManager.ARRAY_SIZE][
- rowSize % PrimitiveArrayManager.ARRAY_SIZE]);
+ timestamps[(rowSize - 1) / PrimitiveArrayManager.ARRAY_SIZE][
+ (rowSize - 1) % PrimitiveArrayManager.ARRAY_SIZE]);
}
insertTabletStatement.setTimes(
new MultiArrayTimeView(PrimitiveArrayManager.ARRAY_SIZE, timestamps,
rowSize));
@@ -364,7 +364,7 @@ public class StatementGenerator {
new ThreeDArrayValueView(
QueryDataSetUtils.readTabletValuesFromBufferWithPam(
valueBuffer, dataTypes, dataTypes.length, rowSize),
- dataTypes,
+ insertTabletStatement::getDataTypes,
rowSize,
PrimitiveArrayManager.ARRAY_SIZE));
insertTabletStatement.setRefCount(new AtomicInteger(1));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index 4b082b68746..2da909e6471 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -136,7 +136,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
super(id, devicePath, isAligned, measurements, dataTypes);
this.times = new SingleArrayTimeView(times);
this.bitMaps = bitMaps;
- this.columns = new TwoDArrayValueView(columns, dataTypes, rowCount);
+ this.columns = new TwoDArrayValueView(columns, this::getDataTypes,
rowCount);
this.rowCount = rowCount;
}
@@ -155,7 +155,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
this.measurementSchemas = measurementSchemas;
this.times = new SingleArrayTimeView(times);
this.bitMaps = bitMaps;
- this.columns = new TwoDArrayValueView(columns, dataTypes, rowCount);
+ this.columns = new TwoDArrayValueView(columns, this::getDataTypes,
rowCount);
this.rowCount = rowCount;
}
@@ -212,7 +212,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
if (dataTypes == null || rowCount == 0) {
throw new IllegalArgumentException("dataTypes and rowCount must be set
first");
}
- this.columns = new TwoDArrayValueView(columns, dataTypes, rowCount);
+ this.columns = new TwoDArrayValueView(columns, this::getDataTypes,
rowCount);
}
public int getRowCount() {
@@ -375,24 +375,29 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
values,
subTimes.length);
} else {
- int numOfArrays = PrimitiveArrayManager.numOfArrays(count);
+ int numOfArrays = PrimitiveArrayManager.getArrayRowCount(count);
long[][] subTimes = new long[numOfArrays][];
for (int i = 0; i < numOfArrays; i++) {
subTimes[i] = (long[])
PrimitiveArrayManager.allocate(TSDataType.INT64);
}
Object[][] values = initTabletValuesWithPam(dataTypes.length, count,
dataTypes);
- return new InsertTabletNode(
- getPlanNodeId(),
- targetPath,
- isAligned,
- measurements,
- dataTypes,
- measurementSchemas,
- new MultiArrayTimeView(PrimitiveArrayManager.ARRAY_SIZE, subTimes,
count),
- bitMaps,
- new ThreeDArrayValueView(values, dataTypes, count,
PrimitiveArrayManager.ARRAY_SIZE),
- count,
- new AtomicInteger(1));
+ InsertTabletNode insertTabletNode =
+ new InsertTabletNode(
+ getPlanNodeId(),
+ targetPath,
+ isAligned,
+ measurements,
+ dataTypes,
+ measurementSchemas,
+ new MultiArrayTimeView(PrimitiveArrayManager.ARRAY_SIZE,
subTimes, count),
+ bitMaps,
+ null,
+ count,
+ new AtomicInteger(1));
+ insertTabletNode.setColumns(
+ new ThreeDArrayValueView(
+ values, insertTabletNode::getDataTypes, count,
PrimitiveArrayManager.ARRAY_SIZE));
+ return insertTabletNode;
}
}
@@ -485,8 +490,11 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
protected Object[][] initTabletValuesWithPam(
int columnSize, int rowSize, TSDataType[] dataTypes) {
Object[][] values = new Object[columnSize][];
- int numOfArrays = PrimitiveArrayManager.numOfArrays(rowSize);
+ int numOfArrays = PrimitiveArrayManager.getArrayRowCount(rowSize);
for (int i = 0; i < values.length; i++) {
+ if (dataTypes[i] == null) {
+ continue;
+ }
for (int j = 0; j < numOfArrays; j++) {
values[i][j] = PrimitiveArrayManager.allocate(dataTypes[i]);
}
@@ -731,7 +739,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
new TwoDArrayValueView(
QueryDataSetUtils.readTabletValuesFromBuffer(
buffer, dataTypes, measurementSize, rowCount),
- dataTypes,
+ this::getDataTypes,
rowCount);
isAligned = buffer.get() == 1;
}
@@ -906,7 +914,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
new TwoDArrayValueView(
QueryDataSetUtils.readTabletValuesFromStream(
stream, dataTypes, measurementSize, rowCount),
- dataTypes,
+ this::getDataTypes,
rowCount);
isAligned = stream.readByte() == 1;
}
@@ -949,7 +957,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
new TwoDArrayValueView(
QueryDataSetUtils.readTabletValuesFromBuffer(
buffer, dataTypes, measurementSize, rowCount),
- dataTypes,
+ this::getDataTypes,
rowCount);
isAligned = buffer.get() == 1;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index a91b0f8dd02..8970ed35839 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -70,6 +70,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
public class InsertTabletStatement extends InsertBaseStatement implements
ISchemaValidation {
@@ -143,7 +144,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
if (dataTypes == null || rowCount == 0) {
throw new IllegalArgumentException("dataTypes and rowCount must be set
first");
}
- this.columns = new TwoDArrayValueView(columns, dataTypes, rowCount);
+ this.columns = new TwoDArrayValueView(columns, this::getDataTypes,
rowCount);
}
public BitMap[] getBitMaps() {
@@ -748,7 +749,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
int arrayEnd = arrayStart + copyLength;
tvList.putTimes(
timestamps[arrayIndex],
- bitMap.getRegion(current, current + copyLength),
+ bitMap == null ? null : bitMap.getRegion(current, current +
copyLength),
arrayStart,
arrayEnd);
@@ -864,11 +865,11 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
public static class TwoDArrayValueView implements ValueView {
- private final TSDataType[] dataTypes;
+ private final Supplier<TSDataType[]> dataTypes;
private Object[] values;
private final int rowLength;
- public TwoDArrayValueView(Object[] values, TSDataType[] dataTypes, int
rowLength) {
+ public TwoDArrayValueView(Object[] values, Supplier<TSDataType[]>
dataTypes, int rowLength) {
this.values = values;
this.dataTypes = dataTypes;
this.rowLength = rowLength;
@@ -886,7 +887,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
@Override
public Object get(int rowIndex, int colIndex) {
- switch (dataTypes[colIndex]) {
+ switch (dataTypes.get()[colIndex]) {
case INT32:
case DATE:
return ((int[]) values[colIndex])[rowIndex];
@@ -906,13 +907,13 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
case VECTOR:
case UNKNOWN:
default:
- throw new IllegalArgumentException(dataTypes[colIndex].toString());
+ throw new
IllegalArgumentException(dataTypes.get()[colIndex].toString());
}
}
@Override
public void set(int rowIndex, int colIndex, Object value) {
- switch (dataTypes[colIndex]) {
+ switch (dataTypes.get()[colIndex]) {
case INT32:
case DATE:
((int[]) values[colIndex])[rowIndex] = ((int) value);
@@ -938,7 +939,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
case VECTOR:
case UNKNOWN:
default:
- throw new IllegalArgumentException(dataTypes[colIndex].toString());
+ throw new
IllegalArgumentException(dataTypes.get()[colIndex].toString());
}
}
@@ -960,7 +961,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
@Override
public void serializeColumn(int colIndex, ByteBuffer buffer) {
- switch (dataTypes[colIndex]) {
+ switch (dataTypes.get()[colIndex]) {
case INT32:
case DATE:
int[] intValues = (int[]) values[colIndex];
@@ -1007,13 +1008,13 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
break;
default:
throw new UnSupportedDataTypeException(
- String.format(DATATYPE_UNSUPPORTED, dataTypes[colIndex]));
+ String.format(DATATYPE_UNSUPPORTED, dataTypes.get()[colIndex]));
}
}
@Override
public void serializeColumn(int colIndex, DataOutputStream stream) throws
IOException {
- switch (dataTypes[colIndex]) {
+ switch (dataTypes.get()[colIndex]) {
case INT32:
case DATE:
int[] intValues = (int[]) values[colIndex];
@@ -1060,13 +1061,13 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
break;
default:
throw new UnSupportedDataTypeException(
- String.format(DATATYPE_UNSUPPORTED, dataTypes[colIndex]));
+ String.format(DATATYPE_UNSUPPORTED, dataTypes.get()[colIndex]));
}
}
@Override
public void serializeColumn(int colIndex, IWALByteBufferView buffer, int
start, int end) {
- switch (dataTypes[colIndex]) {
+ switch (dataTypes.get()[colIndex]) {
case INT32:
case DATE:
int[] intValues = (int[]) values[colIndex];
@@ -1114,18 +1115,18 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
break;
default:
throw new UnSupportedDataTypeException(
- String.format(DATATYPE_UNSUPPORTED, dataTypes[colIndex]));
+ String.format(DATATYPE_UNSUPPORTED, dataTypes.get()[colIndex]));
}
}
@Override
public TSDataType[] dataTypes() {
- return dataTypes;
+ return dataTypes.get();
}
@Override
public void castTo(int colIndex, TSDataType newType) {
- values[colIndex] = newType.castFromArray(dataTypes[colIndex],
values[colIndex]);
+ values[colIndex] = newType.castFromArray(dataTypes.get()[colIndex],
values[colIndex]);
}
@Override
@@ -1185,13 +1186,13 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
@SuppressWarnings("SuspiciousSystemArraycopy")
public static class ThreeDArrayValueView implements ValueView {
- private final TSDataType[] dataTypes;
+ private final Supplier<TSDataType[]> dataTypes;
private Object[][] values;
private final int rowLength;
private final int singleArraySize;
public ThreeDArrayValueView(
- Object[][] values, TSDataType[] dataTypes, int rowLength, int
singleArraySize) {
+ Object[][] values, Supplier<TSDataType[]> dataTypes, int rowLength,
int singleArraySize) {
this.values = values;
this.dataTypes = dataTypes;
this.rowLength = rowLength;
@@ -1210,7 +1211,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
@Override
public Object get(int rowIndex, int colIndex) {
- switch (dataTypes[colIndex]) {
+ switch (dataTypes.get()[colIndex]) {
case INT32:
case DATE:
return ((int[]) values[colIndex][rowIndex /
singleArraySize])[rowIndex % singleArraySize];
@@ -1235,13 +1236,13 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
case VECTOR:
case UNKNOWN:
default:
- throw new IllegalArgumentException(dataTypes[colIndex].toString());
+ throw new
IllegalArgumentException(dataTypes.get()[colIndex].toString());
}
}
@Override
public void set(int rowIndex, int colIndex, Object value) {
- switch (dataTypes[colIndex]) {
+ switch (dataTypes.get()[colIndex]) {
case INT32:
case DATE:
((int[]) values[colIndex][rowIndex / singleArraySize])[rowIndex %
singleArraySize] =
@@ -1273,7 +1274,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
case VECTOR:
case UNKNOWN:
default:
- throw new IllegalArgumentException(dataTypes[colIndex].toString());
+ throw new
IllegalArgumentException(dataTypes.get()[colIndex].toString());
}
}
@@ -1324,7 +1325,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
@Override
public void serializeColumn(int colIndex, ByteBuffer buffer) {
int rowIndex = 0;
- switch (dataTypes[colIndex]) {
+ switch (dataTypes.get()[colIndex]) {
case INT32:
case DATE:
for (Object o : values[colIndex]) {
@@ -1407,14 +1408,14 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
break;
default:
throw new UnSupportedDataTypeException(
- String.format(DATATYPE_UNSUPPORTED, dataTypes[colIndex]));
+ String.format(DATATYPE_UNSUPPORTED, dataTypes.get()[colIndex]));
}
}
@Override
public void serializeColumn(int colIndex, DataOutputStream stream) throws
IOException {
int rowIndex = 0;
- switch (dataTypes[colIndex]) {
+ switch (dataTypes.get()[colIndex]) {
case INT32:
case DATE:
for (Object o : values[colIndex]) {
@@ -1497,7 +1498,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
break;
default:
throw new UnSupportedDataTypeException(
- String.format(DATATYPE_UNSUPPORTED, dataTypes[colIndex]));
+ String.format(DATATYPE_UNSUPPORTED, dataTypes.get()[colIndex]));
}
}
@@ -1505,7 +1506,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
public void serializeColumn(int colIndex, IWALByteBufferView buffer, int
start, int end) {
int serializedCnt = 0;
Object[] column = values[colIndex];
- switch (dataTypes[colIndex]) {
+ switch (dataTypes.get()[colIndex]) {
case INT32:
case DATE:
for (int i = start / singleArraySize; i < column.length; i++) {
@@ -1613,21 +1614,23 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
break;
default:
throw new UnSupportedDataTypeException(
- String.format(DATATYPE_UNSUPPORTED, dataTypes[colIndex]));
+ String.format(DATATYPE_UNSUPPORTED, dataTypes.get()[colIndex]));
}
}
@Override
public TSDataType[] dataTypes() {
- return dataTypes;
+ return dataTypes.get();
}
@Override
public void castTo(int colIndex, TSDataType newType) {
for (int i = 0; i < values[colIndex].length; i++) {
Object originalArray = values[colIndex][i];
- values[colIndex][i] = newType.castFromArray(dataTypes[colIndex],
originalArray);
- release(originalArray);
+ values[colIndex][i] = newType.castFromArray(dataTypes.get()[colIndex],
originalArray);
+ if (originalArray != values[colIndex][i]) {
+ release(originalArray);
+ }
}
}
@@ -1718,7 +1721,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
int arrayEnd = arrayStart + copyLength;
tvList.putValues(
values[columnIndex][arrayIndex],
- bitMap.getRegion(current, current + copyLength),
+ bitMap == null ? null : bitMap.getRegion(current, current +
copyLength),
arrayStart,
arrayEnd,
pos,
@@ -1755,7 +1758,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
tvList.putAlignedValues(
values[i][arrayIndex],
columnIndices.get(i),
- bitMaps[i].getRegion(current, current + copyLength),
+ bitMaps[i] == null ? null : bitMaps[i].getRegion(current,
current + copyLength),
arrayStart,
arrayEnd,
results,
@@ -1771,10 +1774,10 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
public Object[] toTwoDArray() {
Object[] twoDArray = new Object[values.length];
for (int i = 0; i < values.length; i++) {
- if (dataTypes[i] == null) {
+ if (dataTypes.get()[i] == null) {
continue;
}
- switch (dataTypes[i]) {
+ switch (dataTypes.get()[i]) {
case INT32:
case DATE:
twoDArray[i] = new int[rowLength];
@@ -1800,7 +1803,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
case UNKNOWN:
case VECTOR:
default:
- throw new UnSupportedDataTypeException(dataTypes[i].toString());
+ throw new
UnSupportedDataTypeException(dataTypes.get()[i].toString());
}
int arrayIndex = 0;
for (; arrayIndex < values[i].length - 1; arrayIndex++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index 9b726718c23..afc5db33ccb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -507,10 +507,11 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
case TEXT:
case STRING:
case BLOB:
- alignedChunkWriter.writeByColumn(
- time,
- isNull ? null : list.getBinaryByValueIndex(originRowIndex,
columnIndex),
- isNull);
+ Binary value = list.getBinaryByValueIndex(originRowIndex,
columnIndex);
+ if (!isNull && value == null) {
+ System.out.println("");
+ }
+ alignedChunkWriter.writeByColumn(time, isNull ? null : value,
isNull);
break;
default:
break;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
index 65a040a1d89..f4bc88db2ef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
@@ -126,10 +126,6 @@ public class PrimitiveArrayManager {
// Empty constructor
}
- public static int numOfArrays(int numOfElements) {
- return numOfElements / ARRAY_SIZE + numOfElements % ARRAY_SIZE > 0 ? 1 : 0;
- }
-
/**
* Get or allocate primitive data lists according to type.
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index b94b01c2f95..96364c8d9ed 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -122,7 +122,8 @@ public class MemUtils {
for (int i = start; i < end; i++) {
if (results == null
|| results[i] == null
- || results[i].code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ || results[i].code == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && columns.get(i, columnIndex) != null) {
memSize += RamUsageEstimator.sizeOf(((Binary) columns.get(i,
columnIndex)).getValues());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 05c5d9145a0..0b55962f0f1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -646,11 +646,8 @@ public class QueryDataSetUtils {
}
public static long[][] readTimesFromBufferWithPam(ByteBuffer buffer, int
size) {
- int numOfArray =
- size / PrimitiveArrayManager.ARRAY_SIZE + size %
PrimitiveArrayManager.ARRAY_SIZE > 0
- ? 1
- : 0;
- long[][] times = new long[size][];
+ int numOfArray = PrimitiveArrayManager.getArrayRowCount(size);
+ long[][] times = new long[numOfArray][];
for (int i = 0; i < numOfArray; i++) {
times[i] = (long[]) PrimitiveArrayManager.allocate(TSDataType.INT64);
}
@@ -792,9 +789,8 @@ public class QueryDataSetUtils {
ByteBuffer buffer, TSDataType[] types, int columns, int size) {
Object[][] values = new Object[columns][];
int arraySize =
- size / PrimitiveArrayManager.ARRAY_SIZE + size %
PrimitiveArrayManager.ARRAY_SIZE == 0
- ? 0
- : 1;
+ size / PrimitiveArrayManager.ARRAY_SIZE
+ + (size % PrimitiveArrayManager.ARRAY_SIZE == 0 ? 0 : 1);
for (int i = 0; i < columns; i++) {
values[i] = new Object[arraySize];
for (int j = 0; j < arraySize; j++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index a4b6f7ba298..dda343c3de1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -842,7 +842,7 @@ public abstract class AlignedTVList extends TVList {
// the remaining inputs can fit the last array, copy all remaining
inputs into last array
arrayCopy(column, columnIndex, current, arrayIdx, elementIdx,
copyLength);
for (int i = 0; i < copyLength; i++) {
- if (bitMap.isMarked(current + i)
+ if (bitMap != null && bitMap.isMarked(current + i)
|| results != null
&& results[current + i] != null
&& results[current + i].code !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -850,8 +850,8 @@ public abstract class AlignedTVList extends TVList {
}
}
- current += internalRemaining;
- tvListPos += internalRemaining;
+ current += copyLength;
+ tvListPos += copyLength;
}
}