This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7f0ac972839 Pipe: Fix NPE caused by forced type conversion and fix
IoTDBipipeTypeConversionISessionIT of table model (#14667)
7f0ac972839 is described below
commit 7f0ac972839fa919e9cdc7c3eff814c0d249377f
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Jan 10 15:57:08 2025 +0800
Pipe: Fix NPE caused by forced type conversion and fix
IoTDBipipeTypeConversionISessionIT of table model (#14667)
---
.../IoTDBPipeTypeConversionISessionIT.java | 93 +++++++++++++++-------
.../statement/PipeConvertedInsertRowStatement.java | 30 ++++++-
.../PipeConvertedInsertTabletStatement.java | 48 +++++++++--
...leStatementDataTypeConvertExecutionVisitor.java | 3 +-
...eeStatementDataTypeConvertExecutionVisitor.java | 3 +-
.../plan/statement/crud/InsertBaseStatement.java | 6 +-
.../plan/statement/crud/InsertRowStatement.java | 18 ++++-
7 files changed, 159 insertions(+), 42 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeTypeConversionISessionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeTypeConversionISessionIT.java
index 4145fc9a5d2..b3e40bb8a10 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeTypeConversionISessionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeTypeConversionISessionIT.java
@@ -56,12 +56,13 @@ import java.util.concurrent.TimeUnit;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2TableModel.class})
public class IoTDBPipeTypeConversionISessionIT extends
AbstractPipeTableModelTestIT {
- private static final int generateDataSize = 100;
+ private static final int generateDataSize = 1000;
@Test
public void insertTablet() {
@@ -90,8 +91,8 @@ public class IoTDBPipeTypeConversionISessionIT extends
AbstractPipeTableModelTes
param.append(schema.getMeasurementName());
param.append(',');
}
- sql = sql + param.substring(0, param.length() - 1);
- sql = sql + " from " + tableName + " ORDER BY time ASC";
+
+ sql = sql + param + "time from " + tableName + " ORDER BY time ASC";
session.executeNonQueryStatement("use test");
return session.executeQueryStatement(sql);
}
@@ -106,7 +107,8 @@ public class IoTDBPipeTypeConversionISessionIT extends
AbstractPipeTableModelTes
createDatabaseAndTable(measurementSchemas, false, tablet.getColumnTypes(),
receiverEnv);
try (ITableSession senderSession = senderEnv.getTableSessionConnection();
ITableSession receiverSession =
receiverEnv.getTableSessionConnection()) {
-
+ senderSession.executeNonQueryStatement("use test");
+ receiverSession.executeNonQueryStatement("use test");
if (isTsFile) {
// Send TsFile data to receiver
executeDataWriteOperation.accept(senderSession, receiverSession,
tablet);
@@ -121,7 +123,7 @@ public class IoTDBPipeTypeConversionISessionIT extends
AbstractPipeTableModelTes
}
// Verify receiver data
- long timeoutSeconds = 30;
+ long timeoutSeconds = 600;
List<List<Object>> expectedValues =
generateTabletResultSetForTable(tablet, measurementSchemas);
await()
@@ -140,10 +142,10 @@ public class IoTDBPipeTypeConversionISessionIT extends
AbstractPipeTableModelTes
fail(e.getMessage());
}
});
- tablet.reset();
} catch (Exception e) {
fail(e.getMessage());
}
+ tablet.reset();
}
private void createDatabaseAndTable(
@@ -174,16 +176,15 @@ public class IoTDBPipeTypeConversionISessionIT extends
AbstractPipeTableModelTes
String sql =
String.format(
"create pipe test"
- + " with source
('source'='iotdb-source','realtime.mode'='%s','realtime.enable'='%s','history.enable'='%s')"
+ + " with source ('source'='iotdb-source','realtime.mode'='%s')"
+ " with processor ('processor'='do-nothing-processor')"
+ " with sink
('node-urls'='%s:%s','batch.enable'='false','sink.format'='%s')",
isTSFile ? "file" : "forced-log",
- !isTSFile,
- isTSFile,
receiverEnv.getIP(),
receiverEnv.getPort(),
isTSFile ? "tsfile" : "tablet");
- TestUtils.tryExecuteNonQueriesWithRetry(senderEnv,
Collections.singletonList(sql));
+ TestUtils.tryExecuteNonQueriesWithRetry(
+ null, BaseEnv.TABLE_SQL_DIALECT, senderEnv,
Collections.singletonList(sql));
}
private void validateResultSet(
@@ -193,11 +194,13 @@ public class IoTDBPipeTypeConversionISessionIT extends
AbstractPipeTableModelTes
while (dataSet.hasNext()) {
RowRecord record = dataSet.next();
List<Field> fields = record.getFields();
-
- assertEquals(record.getTimestamp(), timestamps[index]);
List<Object> rowValues = values.get(index++);
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
+ if (field.getDataType() == null) {
+ assertNull(rowValues.get(i));
+ continue;
+ }
switch (field.getDataType()) {
case INT64:
case TIMESTAMP:
@@ -207,11 +210,9 @@ public class IoTDBPipeTypeConversionISessionIT extends
AbstractPipeTableModelTes
assertEquals(field.getDateV(), rowValues.get(i));
break;
case BLOB:
- assertEquals(field.getBinaryV(), rowValues.get(i));
- break;
case TEXT:
case STRING:
- assertEquals(field.getStringValue(), rowValues.get(i));
+ assertEquals(field.getBinaryV(), rowValues.get(i));
break;
case INT32:
assertEquals(field.getIntV(), (int) rowValues.get(i));
@@ -283,13 +284,11 @@ public class IoTDBPipeTypeConversionISessionIT extends
AbstractPipeTableModelTes
}
}
- private long[] createTestDataForTimestamp() {
+ private void createTestDataForTimeColumn(Tablet tablet) {
long time = new Date().getTime();
- long[] data = new long[generateDataSize];
- for (int i = 0; i < data.length; i++) {
- data[i] = time++;
+ for (int i = 0; i < generateDataSize; i++) {
+ tablet.addTimestamp(i, time++);
}
- return data;
}
private void createTestDataForDate(Tablet tablet, int j) {
@@ -346,19 +345,35 @@ public class IoTDBPipeTypeConversionISessionIT extends
AbstractPipeTableModelTes
switch (sourceType) {
case INT64:
case TIMESTAMP:
- value = ValueConverter.convert(sourceType, targetType, ((long[])
values[j])[i]);
+ value =
+ ValueConverter.convert(
+ sourceType,
+ targetType,
+ tablet.bitMaps[j].isMarked(i) ? null : ((long[])
values[j])[i]);
insertRecord.add(convert(value, targetType));
break;
case INT32:
- value = ValueConverter.convert(sourceType, targetType, ((int[])
values[j])[i]);
+ value =
+ ValueConverter.convert(
+ sourceType,
+ targetType,
+ tablet.bitMaps[j].isMarked(i) ? null : ((int[])
values[j])[i]);
insertRecord.add(convert(value, targetType));
break;
case DOUBLE:
- value = ValueConverter.convert(sourceType, targetType, ((double[])
values[j])[i]);
+ value =
+ ValueConverter.convert(
+ sourceType,
+ targetType,
+ tablet.bitMaps[j].isMarked(i) ? null : ((double[])
values[j])[i]);
insertRecord.add(convert(value, targetType));
break;
case FLOAT:
- value = ValueConverter.convert(sourceType, targetType, ((float[])
values[j])[i]);
+ value =
+ ValueConverter.convert(
+ sourceType,
+ targetType,
+ tablet.bitMaps[j].isMarked(i) ? null : ((float[])
values[j])[i]);
insertRecord.add(convert(value, targetType));
break;
case DATE:
@@ -366,24 +381,39 @@ public class IoTDBPipeTypeConversionISessionIT extends
AbstractPipeTableModelTes
ValueConverter.convert(
sourceType,
targetType,
- DateUtils.parseDateExpressionToInt(((LocalDate[])
values[j])[i]));
+ tablet.bitMaps[j].isMarked(i)
+ ? null
+ : DateUtils.parseDateExpressionToInt(((LocalDate[])
values[j])[i]));
insertRecord.add(convert(value, targetType));
break;
case TEXT:
case STRING:
- value = ValueConverter.convert(sourceType, targetType, ((Binary[])
values[j])[i]);
+ value =
+ ValueConverter.convert(
+ sourceType,
+ targetType,
+ tablet.bitMaps[j].isMarked(i) ? null : ((Binary[])
values[j])[i]);
insertRecord.add(convert(value, targetType));
break;
case BLOB:
- value = ValueConverter.convert(sourceType, targetType, ((Binary[])
values[j])[i]);
+ value =
+ ValueConverter.convert(
+ sourceType,
+ targetType,
+ tablet.bitMaps[j].isMarked(i) ? null : ((Binary[])
values[j])[i]);
insertRecord.add(convert(value, targetType));
break;
case BOOLEAN:
- value = ValueConverter.convert(sourceType, targetType,
((boolean[]) values[j])[i]);
+ value =
+ ValueConverter.convert(
+ sourceType,
+ targetType,
+ tablet.bitMaps[j].isMarked(i) ? null : ((boolean[])
values[j])[i]);
insertRecord.add(convert(value, targetType));
break;
}
}
+ insertRecord.add(tablet.timestamps[i]);
insertRecords.add(insertRecord);
}
@@ -391,12 +421,15 @@ public class IoTDBPipeTypeConversionISessionIT extends
AbstractPipeTableModelTes
}
private Object convert(Object value, TSDataType targetType) {
+ if (value == null) {
+ return null;
+ }
switch (targetType) {
case DATE:
return DateUtils.parseIntToLocalDate((Integer) value);
case TEXT:
case STRING:
- return new String(((Binary) value).getValues(),
TSFileConfig.STRING_CHARSET);
+ return value;
}
return value;
}
@@ -417,7 +450,7 @@ public class IoTDBPipeTypeConversionISessionIT extends
AbstractPipeTableModelTes
columnTypes,
generateDataSize);
tablet.initBitMaps();
- tablet.timestamps = createTestDataForTimestamp();
+ createTestDataForTimeColumn(tablet);
for (int i = 0; i < pairs.size(); i++) {
MeasurementSchema schema = pairs.get(i).left;
switch (schema.getType()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
index f249a773455..edf8b636ecf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java
@@ -28,10 +28,14 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.tsfile.annotations.TableModel;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
public class PipeConvertedInsertRowStatement extends InsertRowStatement {
@@ -43,7 +47,6 @@ public class PipeConvertedInsertRowStatement extends
InsertRowStatement {
// Statement
isDebug = insertRowStatement.isDebug();
// InsertBaseStatement
- insertRowStatement.removeAllFailedMeasurementMarks();
devicePath = insertRowStatement.getDevicePath();
isAligned = insertRowStatement.isAligned();
measurementSchemas = insertRowStatement.getMeasurementSchemas();
@@ -59,6 +62,31 @@ public class PipeConvertedInsertRowStatement extends
InsertRowStatement {
values = insertRowStatement.getValues();
isNeedInferType = insertRowStatement.isNeedInferType();
deviceID = insertRowStatement.getRawTableDeviceID();
+
+ // To ensure that the measurement remains unchanged during the WAL writing
process, the array
+ // needs to be copied before the failed Measurement mark can be deleted.
+ final MeasurementSchema[] measurementSchemas =
insertRowStatement.getMeasurementSchemas();
+ if (measurementSchemas != null) {
+ this.measurementSchemas = Arrays.copyOf(measurementSchemas,
measurementSchemas.length);
+ }
+
+ final String[] measurements = insertRowStatement.getMeasurements();
+ if (measurements != null) {
+ this.measurements = Arrays.copyOf(measurements, measurements.length);
+ }
+
+ final TSDataType[] dataTypes = insertRowStatement.getDataTypes();
+ if (dataTypes != null) {
+ this.dataTypes = Arrays.copyOf(dataTypes, dataTypes.length);
+ }
+
+ final Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info =
+ insertRowStatement.getFailedMeasurementInfoMap();
+ if (failedMeasurementIndex2Info != null) {
+ this.failedMeasurementIndex2Info = new
HashMap<>(failedMeasurementIndex2Info);
+ }
+
+ removeAllFailedMeasurementMarks();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
index adc19ae7e0f..eee47a5b0b1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java
@@ -24,25 +24,27 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement
import org.apache.tsfile.annotations.TableModel;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
public class PipeConvertedInsertTabletStatement extends InsertTabletStatement {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConvertedInsertTabletStatement.class);
- public PipeConvertedInsertTabletStatement(final InsertTabletStatement
insertTabletStatement) {
+ public PipeConvertedInsertTabletStatement(
+ final InsertTabletStatement insertTabletStatement, boolean
isCopyMeasurement) {
super();
// Statement
isDebug = insertTabletStatement.isDebug();
// InsertBaseStatement
- insertTabletStatement.removeAllFailedMeasurementMarks();
devicePath = insertTabletStatement.getDevicePath();
isAligned = insertTabletStatement.isAligned();
- measurementSchemas = insertTabletStatement.getMeasurementSchemas();
- measurements = insertTabletStatement.getMeasurements();
- dataTypes = insertTabletStatement.getDataTypes();
columnCategories = insertTabletStatement.getColumnCategories();
idColumnIndices = insertTabletStatement.getIdColumnIndices();
attrColumnIndices = insertTabletStatement.getAttrColumnIndices();
@@ -55,6 +57,42 @@ public class PipeConvertedInsertTabletStatement extends
InsertTabletStatement {
deviceIDs = insertTabletStatement.getRawTableDeviceIDs();
singleDevice = insertTabletStatement.isSingleDevice();
rowCount = insertTabletStatement.getRowCount();
+
+ // To ensure that the measurement remains unchanged during the WAL writing
process, the array
+ // needs to be copied before the failed Measurement mark can be deleted.
+ if (isCopyMeasurement) {
+ final MeasurementSchema[] measurementSchemas =
insertTabletStatement.getMeasurementSchemas();
+ if (measurementSchemas != null) {
+ this.measurementSchemas = Arrays.copyOf(measurementSchemas,
measurementSchemas.length);
+ }
+
+ final String[] measurements = insertTabletStatement.getMeasurements();
+ if (measurements != null) {
+ this.measurements = Arrays.copyOf(measurements, measurements.length);
+ }
+
+ final TSDataType[] dataTypes = insertTabletStatement.getDataTypes();
+ if (dataTypes != null) {
+ this.dataTypes = Arrays.copyOf(dataTypes, dataTypes.length);
+ }
+
+ final Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info =
+ insertTabletStatement.getFailedMeasurementInfoMap();
+ if (failedMeasurementIndex2Info != null) {
+ this.failedMeasurementIndex2Info = new
HashMap<>(failedMeasurementIndex2Info);
+ }
+ } else {
+ this.measurementSchemas = insertTabletStatement.getMeasurementSchemas();
+ this.measurements = insertTabletStatement.getMeasurements();
+ this.dataTypes = insertTabletStatement.getDataTypes();
+ this.failedMeasurementIndex2Info =
insertTabletStatement.getFailedMeasurementInfoMap();
+ }
+
+ removeAllFailedMeasurementMarks();
+ }
+
+ public PipeConvertedInsertTabletStatement(final InsertTabletStatement
insertTabletStatement) {
+ this(insertTabletStatement, true);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
index 11952814da0..11858853a2d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
@@ -141,7 +141,8 @@ public class
PipeTableStatementDataTypeConvertExecutionVisitor
rawTabletInsertionEvent.convertToTablet(),
rawTabletInsertionEvent.isAligned(),
databaseName)
- .constructStatement());
+ .constructStatement(),
+ false);
TSStatus result;
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
index eef40c714dc..ed47e519b00 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
@@ -106,7 +106,8 @@ public class
PipeTreeStatementDataTypeConvertExecutionVisitor
new PipeConvertedInsertTabletStatement(
PipeTransferTabletRawReq.toTPipeTransferRawReq(
tabletWithIsAligned.getLeft(),
tabletWithIsAligned.getRight())
- .constructStatement());
+ .constructStatement(),
+ false);
TSStatus result;
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index eb6da0bc2ff..7ae7b023200 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -341,6 +341,10 @@ public abstract class InsertBaseStatement extends
Statement {
.collect(Collectors.toList());
}
+ public Map<Integer, FailedMeasurementInfo> getFailedMeasurementInfoMap() {
+ return failedMeasurementIndex2Info;
+ }
+
public List<Exception> getFailedExceptions() {
return failedMeasurementIndex2Info == null
? Collections.emptyList()
@@ -364,7 +368,7 @@ public abstract class InsertBaseStatement extends Statement
{
.collect(Collectors.toList());
}
- protected static class FailedMeasurementInfo {
+ public static class FailedMeasurementInfo {
protected String measurement;
protected TSDataType dataType;
protected Object value;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
index 85f82ff447d..d33a83b0c32 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
@@ -276,13 +276,25 @@ public class InsertRowStatement extends
InsertBaseStatement implements ISchemaVa
}
failedMeasurementIndex2Info.forEach(
(index, info) -> {
- measurements[index] = info.getMeasurement();
- dataTypes[index] = info.getDataType();
- values[index] = info.getValue();
+ if (measurements != null) {
+ measurements[index] = info.getMeasurement();
+ }
+
+ if (dataTypes != null) {
+ dataTypes[index] = info.getDataType();
+ }
+
+ if (values != null) {
+ values[index] = info.getValue();
+ }
});
failedMeasurementIndex2Info.clear();
}
+ public Map<Integer, FailedMeasurementInfo> getFailedMeasurementInfoMap() {
+ return failedMeasurementIndex2Info;
+ }
+
@Override
public void semanticCheck() {
super.semanticCheck();