This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 5785485eb4f [To dev/1.3] Fixed the REST partial insert & Pipe: Added 
partial insert IT (#17339)
5785485eb4f is described below

commit 5785485eb4f9e293569e544779c8abb6d95fe344
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 25 11:26:37 2026 +0800

    [To dev/1.3] Fixed the REST partial insert & Pipe: Added partial insert IT 
(#17339)
    
    * try
    
    * '
    
    * no-bomb
    
    * side-effect
    
    * Revert "side-effect"
    
    This reverts commit cd104eaf5dd6dcd8d2a86b334c41c12506d2f72b.
    
    * under-flow
    
    * fix
    
    * Fix rest partial insert serde error
    
    * fix testSpecialPartialInsert
    
    * add IT
    
    * fix IT
    
    * ignore flaky test
    
    ---------
    
    Co-authored-by: HTHou <[email protected]>
---
 .../iotdb/it/env/cluster/env/AbstractEnv.java      |  25 +++++
 .../iotdb/it/env/remote/env/RemoteServerEnv.java   |   5 +
 .../java/org/apache/iotdb/itbase/env/BaseEnv.java  |   6 ++
 .../org/apache/iotdb/db/it/IoTDBRestServiceIT.java | 109 ++++++++++++++++++++-
 .../pipe/it/autocreate/IoTDBPipeDataSinkIT.java    |  67 ++++++++++++-
 .../plan/planner/LogicalPlanVisitor.java           |   5 +
 6 files changed, 213 insertions(+), 4 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 547b010a322..60ff77aa03c 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -440,6 +440,14 @@ public abstract class AbstractEnv implements BaseEnv {
         this);
   }
 
+  @Override
+  public Connection getAvailableConnection(String username, String password) 
throws SQLException {
+    return new ClusterTestConnection(
+        getWriteConnection(null, username, password),
+        getOneAvailableReadConnection(null, username, password),
+        this);
+  }
+
   @Override
   public Connection getConnection(
       final DataNodeWrapper dataNodeWrapper, final String username, final 
String password)
@@ -656,6 +664,23 @@ public abstract class AbstractEnv implements BaseEnv {
     return readConnRequestDelegate.requestAll();
   }
 
+  protected List<NodeConnection> getOneAvailableReadConnection(
+      final Constant.Version version, final String username, final String 
password)
+      throws SQLException {
+    final List<DataNodeWrapper> dataNodeWrapperListCopy = new 
ArrayList<>(dataNodeWrapperList);
+    Collections.shuffle(dataNodeWrapperListCopy);
+    SQLException lastException = null;
+    for (final DataNodeWrapper dataNode : dataNodeWrapperListCopy) {
+      try {
+        return getReadConnections(version, dataNode, username, password);
+      } catch (final SQLException e) {
+        lastException = e;
+      }
+    }
+    logger.error("Failed to get connection from any DataNode, last exception 
is ", lastException);
+    throw lastException;
+  }
+
   // use this to avoid some runtimeExceptions when try to get jdbc connections.
   // because it is hard to add retry and handle exception when getting jdbc 
connections in
   // getWriteConnectionWithSpecifiedDataNode and getReadConnections.
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
index efd883e41a9..5c4f91cf5c6 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
@@ -136,6 +136,11 @@ public class RemoteServerEnv implements BaseEnv {
     return connection;
   }
 
+  @Override
+  public Connection getAvailableConnection(String username, String password) 
throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public Connection getWriteOnlyConnectionWithSpecifiedDataNode(
       DataNodeWrapper dataNode, String username, String password) {
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index c3dc9a3eb32..bede2b001f7 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -120,6 +120,12 @@ public interface BaseEnv {
   Connection getConnection(DataNodeWrapper dataNodeWrapper, String username, 
String password)
       throws SQLException;
 
+  default Connection getAvailableConnection() throws SQLException {
+    return getAvailableConnection(SessionConfig.DEFAULT_USER, 
SessionConfig.DEFAULT_PASSWORD);
+  }
+
+  Connection getAvailableConnection(String username, String password) throws 
SQLException;
+
   default Connection 
getWriteOnlyConnectionWithSpecifiedDataNode(DataNodeWrapper dataNode)
       throws SQLException {
     return getWriteOnlyConnectionWithSpecifiedDataNode(
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java
index 53b92918877..ad1b93ad2d8 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.it;
 
 import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.ClusterIT;
@@ -41,6 +42,7 @@ import org.apache.http.util.EntityUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -48,11 +50,19 @@ import org.junit.runner.RunWith;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Base64;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
+import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
+import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
 import static 
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.COLUMN_TTL;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -78,7 +88,7 @@ public class IoTDBRestServiceIT {
     EnvFactory.getEnv().cleanClusterEnvironment();
   }
 
-  private String getAuthorization(String username, String password) {
+  public static String getAuthorization(String username, String password) {
     return Base64.getEncoder()
         .encodeToString((username + ":" + 
password).getBytes(StandardCharsets.UTF_8));
   }
@@ -128,7 +138,7 @@ public class IoTDBRestServiceIT {
     }
   }
 
-  private HttpPost getHttpPost(String url) {
+  public static HttpPost getHttpPost(String url) {
     HttpPost httpPost = new HttpPost(url);
     httpPost.addHeader("Content-type", "application/json; charset=utf-8");
     httpPost.setHeader("Accept", "application/json");
@@ -242,6 +252,101 @@ public class IoTDBRestServiceIT {
     }
   }
 
+  @Ignore // Flaky test
+  @Test
+  public void errorInsertRecords() throws SQLException, InterruptedException {
+    SimpleEnv simpleEnv = new SimpleEnv();
+    simpleEnv
+        .getConfig()
+        .getCommonConfig()
+        .setSchemaRegionConsensusProtocolClass(RATIS_CONSENSUS)
+        .setSchemaReplicationFactor(3)
+        .setDataRegionConsensusProtocolClass(IOT_CONSENSUS)
+        .setDataReplicationFactor(2);
+    simpleEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
+    simpleEnv.initClusterEnvironment(1, 3);
+
+    CloseableHttpResponse response = null;
+    CloseableHttpClient httpClient = HttpClientBuilder.create().build();
+    try {
+      HttpPost httpPost =
+          getHttpPost(
+              "http://";
+                  + simpleEnv.getDataNodeWrapper(0).getIp()
+                  + ":"
+                  + simpleEnv.getDataNodeWrapper(0).getRestServicePort()
+                  + "/rest/v2/insertRecords");
+      String json =
+          
"{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}";
+      httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
+      for (int i = 0; i < 30; i++) {
+        try {
+          response = httpClient.execute(httpPost);
+          break;
+        } catch (Exception e) {
+          if (i == 29) {
+            throw e;
+          }
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ex) {
+            throw new RuntimeException(ex);
+          }
+        }
+      }
+
+      HttpEntity responseEntity = response.getEntity();
+      String message = EntityUtils.toString(responseEntity, "utf-8");
+      JsonObject result = JsonParser.parseString(message).getAsJsonObject();
+      assertEquals(507, Integer.parseInt(result.get("code").toString()));
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      try {
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+    }
+    TimeUnit.SECONDS.sleep(5);
+
+    try {
+      for (DataNodeWrapper dataNodeWrapper : 
simpleEnv.getDataNodeWrapperList()) {
+        dataNodeWrapper.stop();
+        try (Connection connectionAfterNodeDown = 
simpleEnv.getAvailableConnection();
+            Statement statementAfterNodeDown = 
connectionAfterNodeDown.createStatement()) {
+          int count = 0;
+          try (ResultSet resultSet =
+              statementAfterNodeDown.executeQuery(
+                  "select s88, s77, s66, s55, s44, s33 from root.s1")) {
+            ResultSetMetaData metaData = resultSet.getMetaData();
+            while (resultSet.next()) {
+              StringBuilder row = new StringBuilder();
+              for (int i = 0; i < metaData.getColumnCount(); i++) {
+                row.append(resultSet.getString(i + 1)).append(",");
+              }
+              System.out.println(row);
+              count++;
+            }
+          }
+          assertEquals(3, count);
+        }
+        dataNodeWrapper.start();
+        TimeUnit.SECONDS.sleep(1);
+      }
+    } catch (SQLException e) {
+      if (!e.getMessage().contains("Maybe server is down")) {
+        throw e;
+      }
+    } finally {
+      simpleEnv.cleanClusterEnvironment();
+    }
+  }
+
   public void rightInsertTablet(CloseableHttpClient httpClient, String json, 
HttpPost httpPost) {
     CloseableHttpResponse response = null;
     try {
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
index 5216fe20a27..86e53481074 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
@@ -28,11 +28,16 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
+import java.nio.charset.Charset;
 import java.sql.Connection;
 import java.sql.Statement;
 import java.util.Arrays;
@@ -42,11 +47,20 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.function.Consumer;
 
+import static org.apache.iotdb.db.it.IoTDBRestServiceIT.getHttpPost;
+
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2AutoCreateSchema.class})
 public class IoTDBPipeDataSinkIT extends AbstractPipeDualAutoIT {
+
+  @Override
+  protected void setupConfig() {
+    super.setupConfig();
+    senderEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
+  }
+
   @Test
-  public void testThriftConnectorWithRealtimeFirstDisabled() throws Exception {
+  public void testThriftSinkWithRealtimeFirstDisabled() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
 
     final String receiverIp = receiverDataNode.getIp();
@@ -196,7 +210,7 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualAutoIT {
   }
 
   @Test
-  public void testLegacyConnector() throws Exception {
+  public void testLegacySink() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
 
     final String receiverIp = receiverDataNode.getIp();
@@ -503,4 +517,53 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualAutoIT {
           Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1.0,", 
"2,1.0,"))));
     }
   }
+
+  @Test
+  public void testSpecialPartialInsert() throws Exception {
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipe a2b with sink ('node-urls'='%s')",
+              
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
+    }
+
+    CloseableHttpClient httpClient = HttpClientBuilder.create().build();
+
+    HttpPost httpPost =
+        getHttpPost(
+            "http://";
+                + senderEnv.getDataNodeWrapper(0).getIp()
+                + ":"
+                + senderEnv.getDataNodeWrapper(0).getRestServicePort()
+                + "/rest/v2/insertRecords");
+    String json =
+        
"{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}";
+    httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
+    for (int i = 0; i < 30; i++) {
+      try {
+        httpClient.execute(httpPost);
+        break;
+      } catch (final Exception e) {
+        if (i == 29) {
+          throw e;
+        }
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ex) {
+          throw new RuntimeException(ex);
+        }
+      }
+    }
+
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "select s88, s77, s66, s55, s44, s33 from root.s1",
+        "Time,root.s1.s88,root.s1.s77,root.s1.s66,root.s1.s55,root.s1.s33,",
+        new HashSet<>(
+            Arrays.asList(
+                "1635232113960,null,null,null,null,1,",
+                "1635232151960,null,null,2.0,2.1,null,",
+                "1635232143960,6.0,4.0,null,null,null,")));
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index ee2ccc30586..be0b28a0282 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -714,6 +714,11 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
               insertRowStatement.getTime(),
               insertRowStatement.getValues(),
               insertRowStatement.isNeedInferType());
+      if (insertRowStatement.getFailedMeasurementInfoMap() != null) {
+        for (Integer index : 
insertRowStatement.getFailedMeasurementInfoMap().keySet()) {
+          insertRowNode.markFailedMeasurement(index);
+        }
+      }
       
insertRowNode.setFailedMeasurementNumber(insertRowStatement.getFailedMeasurementNumber());
       insertRowsNode.addOneInsertRowNode(insertRowNode, i);
     }

Reply via email to