This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new 43b5431df26 [To rel/1.2] Pipe: fix NPE in 
PipeEnrichedInsertBaseStatement#isQuery when analyzing statement in metadata 
mismatch scenarios (#11203) (#11206)
43b5431df26 is described below

commit 43b5431df26da241b163bcc16605e46e3bf7be24
Author: V_Galaxy <[email protected]>
AuthorDate: Mon Sep 25 02:00:58 2023 +0800

    [To rel/1.2] Pipe: fix NPE in PipeEnrichedInsertBaseStatement#isQuery when 
analyzing statement in metadata mismatch scenarios (#11203) (#11206)
    
    Consider the following data synchronization scenario (metadata mismatch) 
using Pipe engine.
    
    1. starting two instances of IoTDB
    
    - A datanode -> 127.0.0.1:6667
    - B datanode -> 127.0.0.1:6668
    
    **NOTE**: IoTDB B should be configured with the following 
`iotdb-common.properties`:
    
    ```properties
    enable_partial_insert=false
    enable_auto_create_schema=false
    ```
    
    2. connecting IoTDB B (6668) by cli and send
    
    ```sql
    create TIMESERIES root.sg.d1.s0 with datatype=float;
    create TIMESERIES root.sg.d1.s1 with datatype=float;
    ```
    
    3. connecting IoTDB A (6667) by cli and send
    
    ```sql
    create pipe test
    with connector (
        'connector'='iotdb-thrift-connector',
        'connector.ip'='127.0.0.1',
        'connector.port'='6668'
    );
    
    start pipe test;
    
    create TIMESERIES root.sg.d1.s0 with datatype=text;
    create TIMESERIES root.sg.d1.s1 with datatype=float;
    
    insert into root.sg.d1(time, s0, s1) values (3, "one", 2.2);
    ```
    
    ---
    
    In IoTDB B, the following errors occur:
    
    ```plain
    2023-09-23 17:18:53,197 [pool-31-IoTDB-ClientRPC-Processor-3] ERROR 
o.a.t.ProcessFunction:47 - Internal error processing pipeTransfer
    java.lang.NullPointerException: null
            at 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement.isQuery(PipeEnrichedInsertBaseStatement.java:70)
            at 
org.apache.iotdb.db.queryengine.plan.analyze.Analyzer.analyze(Analyzer.java:49)
            at 
org.apache.iotdb.db.queryengine.plan.execution.QueryExecution.analyze(QueryExecution.java:310)
            at 
org.apache.iotdb.db.queryengine.plan.execution.QueryExecution.<init>(QueryExecution.java:170)
            at 
org.apache.iotdb.db.queryengine.plan.Coordinator.createQueryExecution(Coordinator.java:113)
            at 
org.apache.iotdb.db.queryengine.plan.Coordinator.execute(Coordinator.java:147)
            at 
org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.executeStatement(IoTDBThriftReceiverV1.java:498)
            at 
org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.handleTransferTabletBatch(IoTDBThriftReceiverV1.java:241)
            at 
org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.receive(IoTDBThriftReceiverV1.java:111)
            at 
org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverAgent.receive(IoTDBThriftReceiverAgent.java:43)
            at 
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.pipeTransfer(ClientRPCServiceImpl.java:2549)
            at 
org.apache.iotdb.service.rpc.thrift.IClientRPCService$Processor$pipeTransfer.getResult(IClientRPCService.java:5295)
            at 
org.apache.iotdb.service.rpc.thrift.IClientRPCService$Processor$pipeTransfer.getResult(IClientRPCService.java:5275)
            at 
org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38)
            at 
org.apache.iotdb.db.protocol.thrift.ProcessorWithMetrics.process(ProcessorWithMetrics.java:64)
            at 
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:248)
            at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
            at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
            at java.base/java.lang.Thread.run(Thread.java:829)
    ```
    
    By debugging and tracing back to the source of the error:
    
    - 
`org.apache.iotdb.db.queryengine.plan.analyze.Analyzer#analyze(org.apache.iotdb.db.queryengine.plan.statement.Statement)`
    - `org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor#process`
    - 
`org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor#visitPipeEnrichedInsert`
    - 
`org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator#validate(org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher,
 org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement, 
org.apache.iotdb.db.queryengine.common.MPPQueryContext)`
    - 
`org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement#updateAfterSchemaValidation`
    - 
`org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement#transferType`
    
    we found that the error occurs in `CommonUtils.parseValue` in 
`transferType`, where it throws a `QueryProcessException`. This exception is 
caught by `validateSchema`, causing the `finishQueryAfterAnalyze` field of the 
`analysis` object to be set to true. This subsequently leads to the `statement` 
field being null, ultimately triggering a `NullPointerException`.
    
    The purpose of this PR is to fix this NPE error, **providing more accurate 
error information for both the sender (IoTDB A) and receiver (IoTDB B)**.
    
    Notice that in `iotdb-common.properties`, the default value of 
`enable_partial_insert` is true. In the default configuration, **executing the 
above data synchronization scenario will NOT trigger a `NullPointerException` 
(NPE) error**. This is because in this situation, the `transferType` method 
will not rethrow a **`QueryProcessException`** exception.
    
    ```java
    try {
      values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
    } catch (Exception e) {
      LOGGER.warn(
          "data type of {}.{} is not consistent, "
              + "registered type {}, inserting timestamp {}, value {}",
          devicePath,
          measurements[i],
          dataTypes[i],
          time,
          values[i]);
      if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) { 
// <-- NOTE HERE
        throw e;
      } else {
        markFailedMeasurement(i, e);
      }
    }
    ```
    
    The `markFailedMeasurement` method also leads to the analysis becoming 
failed, but it does not set `finishQueryAfterAnalyze` to true (why?) 🤔.
    
    ```java
    if (hasFailedMeasurement) {
      partialInsertMessage =
          String.format(
              "Fail to insert measurements %s caused by %s",
              insertStatement.getFailedMeasurements(), 
insertStatement.getFailedMessages());
      logger.warn(partialInsertMessage);
      analysis.setFailStatus(
          RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(), 
partialInsertMessage));
    }
    ```
    
    In this situation, the execution state machine's transition to a failed 
state is reasonable:
    
    ```java
    if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) {
      stateMachine.transitionToFailed(analysis.getFailStatus());
    }
    ```
    
    and warning logs are recorded as below:
    ```java
    2023-09-23 17:42:25,714 [pool-31-IoTDB-ClientRPC-Processor-3] WARN  
o.a.i.d.p.r.t.IoTDBThriftReceiverV1:507 - failed to execute statement, 
statement: 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement@2a3b3d83,
 result status is: TSStatus(code:507, message:Fail to insert measurements [s0] 
caused by [data type of root.sg.d1.s0 is not consistent, registered type FLOAT, 
inserting type TEXT, timestamp 3, value one])
    2023-09-23 17:42:25,720 
[pool-31-IoTDB-ClientRPC-Processor-3$20230923_094225_00004_1] WARN  
o.a.i.d.q.p.a.AnalyzeVisitor:2693 - Fail to insert measurements [s0] caused by 
[data type of root.sg.d1.s0 is not consistent, registered type FLOAT, inserting 
type TEXT, timestamp 3, value one]
    ```
    
    Therefore, when `enable_partial_insert` is set to false, we only need to 
avoid THIS NPE **directly** to achieve the expected purpose.
---
 .../plan/statement/crud/PipeEnrichedInsertBaseStatement.java           | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
index 10834b20ecc..c6f43c3f7ac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 public class PipeEnrichedInsertBaseStatement extends InsertBaseStatement {
 
@@ -67,7 +68,7 @@ public class PipeEnrichedInsertBaseStatement extends 
InsertBaseStatement {
 
   @Override
   public boolean isQuery() {
-    return insertBaseStatement.isQuery();
+    return !Objects.isNull(insertBaseStatement) && 
insertBaseStatement.isQuery();
   }
 
   @Override

Reply via email to