This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new 43b5431df26 [To rel/1.2] Pipe: fix NPE in
PipeEnrichedInsertBaseStatement#isQuery when analyzing statement in metadata
mismatch scenarios (#11203) (#11206)
43b5431df26 is described below
commit 43b5431df26da241b163bcc16605e46e3bf7be24
Author: V_Galaxy <[email protected]>
AuthorDate: Mon Sep 25 02:00:58 2023 +0800
[To rel/1.2] Pipe: fix NPE in PipeEnrichedInsertBaseStatement#isQuery when
analyzing statement in metadata mismatch scenarios (#11203) (#11206)
Consider the following data synchronization scenario (metadata mismatch)
using Pipe engine.
1. starting two instances of IoTDB
- A datanode -> 127.0.0.1:6667
- B datanode -> 127.0.0.1:6668
**NOTE**: IoTDB B should be configured with the following
`iotdb-common.properties`:
```properties
enable_partial_insert=false
enable_auto_create_schema=false
```
2. connecting IoTDB B (6668) by cli and send
```sql
create TIMESERIES root.sg.d1.s0 with datatype=float;
create TIMESERIES root.sg.d1.s1 with datatype=float;
```
3. connecting IoTDB A (6667) by cli and send
```sql
create pipe test
with connector (
'connector'='iotdb-thrift-connector',
'connector.ip'='127.0.0.1',
'connector.port'='6668'
);
start pipe test;
create TIMESERIES root.sg.d1.s0 with datatype=text;
create TIMESERIES root.sg.d1.s1 with datatype=float;
insert into root.sg.d1(time, s0, s1) values (3, "one", 2.2);
```
---
In IoTDB B, the following errors occur:
```plain
2023-09-23 17:18:53,197 [pool-31-IoTDB-ClientRPC-Processor-3] ERROR
o.a.t.ProcessFunction:47 - Internal error processing pipeTransfer
java.lang.NullPointerException: null
at
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement.isQuery(PipeEnrichedInsertBaseStatement.java:70)
at
org.apache.iotdb.db.queryengine.plan.analyze.Analyzer.analyze(Analyzer.java:49)
at
org.apache.iotdb.db.queryengine.plan.execution.QueryExecution.analyze(QueryExecution.java:310)
at
org.apache.iotdb.db.queryengine.plan.execution.QueryExecution.<init>(QueryExecution.java:170)
at
org.apache.iotdb.db.queryengine.plan.Coordinator.createQueryExecution(Coordinator.java:113)
at
org.apache.iotdb.db.queryengine.plan.Coordinator.execute(Coordinator.java:147)
at
org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.executeStatement(IoTDBThriftReceiverV1.java:498)
at
org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.handleTransferTabletBatch(IoTDBThriftReceiverV1.java:241)
at
org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.receive(IoTDBThriftReceiverV1.java:111)
at
org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverAgent.receive(IoTDBThriftReceiverAgent.java:43)
at
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.pipeTransfer(ClientRPCServiceImpl.java:2549)
at
org.apache.iotdb.service.rpc.thrift.IClientRPCService$Processor$pipeTransfer.getResult(IClientRPCService.java:5295)
at
org.apache.iotdb.service.rpc.thrift.IClientRPCService$Processor$pipeTransfer.getResult(IClientRPCService.java:5275)
at
org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38)
at
org.apache.iotdb.db.protocol.thrift.ProcessorWithMetrics.process(ProcessorWithMetrics.java:64)
at
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:248)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
```
By debugging and tracing back to the source of the error:
-
`org.apache.iotdb.db.queryengine.plan.analyze.Analyzer#analyze(org.apache.iotdb.db.queryengine.plan.statement.Statement)`
- `org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor#process`
-
`org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor#visitPipeEnrichedInsert`
-
`org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator#validate(org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher,
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement,
org.apache.iotdb.db.queryengine.common.MPPQueryContext)`
-
`org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement#updateAfterSchemaValidation`
-
`org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement#transferType`
we found that the error occurs in `CommonUtils.parseValue` in
`transferType`, where it throws a `QueryProcessException`. This exception is
caught by `validateSchema`, causing the `finishQueryAfterAnalyze` field of the
`analysis` object to be set to true. This subsequently leads to the `statement`
field being null, ultimately triggering a `NullPointerException`.
The purpose of this PR is to fix this NPE error, **providing more accurate
error information for both the sender (IoTDB A) and receiver (IoTDB B)**.
Notice that in `iotdb-common.properties`, the default value of
`enable_partial_insert` is true. In the default configuration, **executing the
above data synchronization scenario will NOT trigger a `NullPointerException`
(NPE) error**. This is because in this situation, the `transferType` method
will not rethrow a **`QueryProcessException`** exception.
```java
try {
values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
} catch (Exception e) {
LOGGER.warn(
"data type of {}.{} is not consistent, "
+ "registered type {}, inserting timestamp {}, value {}",
devicePath,
measurements[i],
dataTypes[i],
time,
values[i]);
if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
// <-- NOTE HERE
throw e;
} else {
markFailedMeasurement(i, e);
}
}
```
The `markFailedMeasurement` method also leads to the analysis becoming
failed, but it does not set `finishQueryAfterAnalyze` to true (why?) 🤔.
```java
if (hasFailedMeasurement) {
partialInsertMessage =
String.format(
"Fail to insert measurements %s caused by %s",
insertStatement.getFailedMeasurements(),
insertStatement.getFailedMessages());
logger.warn(partialInsertMessage);
analysis.setFailStatus(
RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(),
partialInsertMessage));
}
```
In this situation, the execution state machine's transition to a failed
state is reasonable:
```java
if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) {
stateMachine.transitionToFailed(analysis.getFailStatus());
}
```
and warning logs are recorded as below:
```java
2023-09-23 17:42:25,714 [pool-31-IoTDB-ClientRPC-Processor-3] WARN
o.a.i.d.p.r.t.IoTDBThriftReceiverV1:507 - failed to execute statement,
statement:
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement@2a3b3d83,
result status is: TSStatus(code:507, message:Fail to insert measurements [s0]
caused by [data type of root.sg.d1.s0 is not consistent, registered type FLOAT,
inserting type TEXT, timestamp 3, value one])
2023-09-23 17:42:25,720
[pool-31-IoTDB-ClientRPC-Processor-3$20230923_094225_00004_1] WARN
o.a.i.d.q.p.a.AnalyzeVisitor:2693 - Fail to insert measurements [s0] caused by
[data type of root.sg.d1.s0 is not consistent, registered type FLOAT, inserting
type TEXT, timestamp 3, value one]
```
Therefore, when `enable_partial_insert` is set to false, we only need to
avoid THIS NPE **directly** to achieve the expected purpose.
---
.../plan/statement/crud/PipeEnrichedInsertBaseStatement.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
index 10834b20ecc..c6f43c3f7ac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
public class PipeEnrichedInsertBaseStatement extends InsertBaseStatement {
@@ -67,7 +68,7 @@ public class PipeEnrichedInsertBaseStatement extends
InsertBaseStatement {
@Override
public boolean isQuery() {
- return insertBaseStatement.isQuery();
+ return !Objects.isNull(insertBaseStatement) &&
insertBaseStatement.isQuery();
}
@Override