This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f9cb661d1b Fixed the REST partial insert & Pipe: Added partial insert 
IT (#17340)
5f9cb661d1b is described below

commit 5f9cb661d1b94135ad30b371a26fa73f20016995
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 25 11:54:30 2026 +0800

    Fixed the REST partial insert & Pipe: Added partial insert IT (#17340)
    
    * try
    
    * '
    
    * no-bomb
    
    * fix
    
    * Revert "side-effect"
    
    This reverts commit cd104eaf5dd6dcd8d2a86b334c41c12506d2f72b.
    
    * fix
    
    * fix
    
    * fix bug
    
    * fix bug
    
    * ignore flaky test
    
    ---------
    
    Co-authored-by: HTHou <[email protected]>
---
 .../org/apache/iotdb/db/it/IoTDBRestServiceIT.java | 109 ++++++++++++++++++++-
 .../treemodel/auto/basic/IoTDBPipeDataSinkIT.java  |  66 ++++++++++++-
 .../plan/planner/LogicalPlanVisitor.java           |   5 +
 3 files changed, 176 insertions(+), 4 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java
index d84a2493cc2..97287dfaba2 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.it;
 
 import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
 import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.ClusterIT;
@@ -41,6 +42,7 @@ import org.apache.http.util.EntityUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -48,12 +50,20 @@ import org.junit.runner.RunWith;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Base64;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.COLUMN_TTL;
+import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
+import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -79,7 +89,7 @@ public class IoTDBRestServiceIT {
     EnvFactory.getEnv().cleanClusterEnvironment();
   }
 
-  private String getAuthorization(String username, String password) {
+  public static String getAuthorization(String username, String password) {
     return Base64.getEncoder()
         .encodeToString((username + ":" + 
password).getBytes(StandardCharsets.UTF_8));
   }
@@ -129,7 +139,7 @@ public class IoTDBRestServiceIT {
     }
   }
 
-  private HttpPost getHttpPost(String url) {
+  public static HttpPost getHttpPost(String url) {
     HttpPost httpPost = new HttpPost(url);
     httpPost.addHeader("Content-type", "application/json; charset=utf-8");
     httpPost.setHeader("Accept", "application/json");
@@ -243,6 +253,101 @@ public class IoTDBRestServiceIT {
     }
   }
 
+  @Ignore // Flaky test
+  @Test
+  public void errorInsertRecords() throws SQLException, InterruptedException {
+    SimpleEnv simpleEnv = new SimpleEnv();
+    simpleEnv
+        .getConfig()
+        .getCommonConfig()
+        .setSchemaRegionConsensusProtocolClass(RATIS_CONSENSUS)
+        .setSchemaReplicationFactor(3)
+        .setDataRegionConsensusProtocolClass(IOT_CONSENSUS)
+        .setDataReplicationFactor(2);
+    simpleEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
+    simpleEnv.initClusterEnvironment(1, 3);
+
+    CloseableHttpResponse response = null;
+    CloseableHttpClient httpClient = HttpClientBuilder.create().build();
+    try {
+      HttpPost httpPost =
+          getHttpPost(
+              "http://";
+                  + simpleEnv.getDataNodeWrapper(0).getIp()
+                  + ":"
+                  + simpleEnv.getDataNodeWrapper(0).getRestServicePort()
+                  + "/rest/v2/insertRecords");
+      String json =
+          
"{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}";
+      httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
+      for (int i = 0; i < 30; i++) {
+        try {
+          response = httpClient.execute(httpPost);
+          break;
+        } catch (Exception e) {
+          if (i == 29) {
+            throw e;
+          }
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ex) {
+            throw new RuntimeException(ex);
+          }
+        }
+      }
+
+      HttpEntity responseEntity = response.getEntity();
+      String message = EntityUtils.toString(responseEntity, "utf-8");
+      JsonObject result = JsonParser.parseString(message).getAsJsonObject();
+      assertEquals(507, Integer.parseInt(result.get("code").toString()));
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      try {
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+    }
+    TimeUnit.SECONDS.sleep(5);
+
+    try {
+      for (DataNodeWrapper dataNodeWrapper : 
simpleEnv.getDataNodeWrapperList()) {
+        dataNodeWrapper.stop();
+        try (Connection connectionAfterNodeDown = 
simpleEnv.getAvailableConnection();
+            Statement statementAfterNodeDown = 
connectionAfterNodeDown.createStatement()) {
+          int count = 0;
+          try (ResultSet resultSet =
+              statementAfterNodeDown.executeQuery(
+                  "select s88, s77, s66, s55, s44, s33 from root.s1")) {
+            ResultSetMetaData metaData = resultSet.getMetaData();
+            while (resultSet.next()) {
+              StringBuilder row = new StringBuilder();
+              for (int i = 0; i < metaData.getColumnCount(); i++) {
+                row.append(resultSet.getString(i + 1)).append(",");
+              }
+              System.out.println(row);
+              count++;
+            }
+          }
+          assertEquals(3, count);
+        }
+        dataNodeWrapper.start();
+        TimeUnit.SECONDS.sleep(1);
+      }
+    } catch (SQLException e) {
+      if (!e.getMessage().contains("Maybe server is down")) {
+        throw e;
+      }
+    } finally {
+      simpleEnv.cleanClusterEnvironment();
+    }
+  }
+
   public void rightInsertTablet(CloseableHttpClient httpClient, String json, 
HttpPost httpPost) {
     CloseableHttpResponse response = null;
     try {
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
index 1fc21fd22b0..36a0ea0cc92 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
@@ -29,12 +29,17 @@ import 
org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic;
 import 
org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
+import java.nio.charset.Charset;
 import java.sql.Connection;
 import java.sql.Statement;
 import java.util.Arrays;
@@ -44,6 +49,8 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.function.Consumer;
 
+import static org.apache.iotdb.db.it.IoTDBRestServiceIT.getHttpPost;
+
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2DualTreeAutoBasic.class})
 public class IoTDBPipeDataSinkIT extends AbstractPipeDualTreeModelAutoIT {
@@ -54,8 +61,14 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualTreeModelAutoIT {
     super.setUp();
   }
 
+  @Override
+  protected void setupConfig() {
+    super.setupConfig();
+    senderEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
+  }
+
   @Test
-  public void testThriftConnectorWithRealtimeFirstDisabled() throws Exception {
+  public void testThriftSinkWithRealtimeFirstDisabled() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
 
     final String receiverIp = receiverDataNode.getIp();
@@ -207,7 +220,7 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualTreeModelAutoIT {
   }
 
   @Test
-  public void testLegacyConnector() throws Exception {
+  public void testLegacySink() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
 
     final String receiverIp = receiverDataNode.getIp();
@@ -514,4 +527,53 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualTreeModelAutoIT {
           Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1.0,", 
"2,1.0,"))));
     }
   }
+
+  @Test
+  public void testSpecialPartialInsert() throws Exception {
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipe a2b with sink ('node-urls'='%s')",
+              
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
+    }
+
+    CloseableHttpClient httpClient = HttpClientBuilder.create().build();
+
+    HttpPost httpPost =
+        getHttpPost(
+            "http://";
+                + senderEnv.getDataNodeWrapper(0).getIp()
+                + ":"
+                + senderEnv.getDataNodeWrapper(0).getRestServicePort()
+                + "/rest/v2/insertRecords");
+    String json =
+        
"{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}";
+    httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
+    for (int i = 0; i < 30; i++) {
+      try {
+        httpClient.execute(httpPost);
+        break;
+      } catch (final Exception e) {
+        if (i == 29) {
+          throw e;
+        }
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ex) {
+          throw new RuntimeException(ex);
+        }
+      }
+    }
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "select s88, s77, s66, s55, s44, s33 from root.s1",
+        "Time,root.s1.s88,root.s1.s77,root.s1.s66,root.s1.s55,root.s1.s33,",
+        new HashSet<>(
+            Arrays.asList(
+                "1635232113960,null,null,null,null,1,",
+                "1635232151960,null,null,2.0,2.1,null,",
+                "1635232143960,6.0,4.0,null,null,null,")));
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index c2fba05dae6..f69fbd91196 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -792,6 +792,11 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
               insertRowStatement.getTime(),
               insertRowStatement.getValues(),
               insertRowStatement.isNeedInferType());
+      if (insertRowStatement.getFailedMeasurementInfoMap() != null) {
+        for (Integer index : 
insertRowStatement.getFailedMeasurementInfoMap().keySet()) {
+          insertRowNode.markFailedMeasurement(index);
+        }
+      }
       
insertRowNode.setFailedMeasurementNumber(insertRowStatement.getFailedMeasurementNumber());
       insertRowsNode.addOneInsertRowNode(insertRowNode, i);
     }

Reply via email to