This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5f9cb661d1b Fixed the REST partial insert & Pipe: Added partial insert
IT (#17340)
5f9cb661d1b is described below
commit 5f9cb661d1b94135ad30b371a26fa73f20016995
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 25 11:54:30 2026 +0800
Fixed the REST partial insert & Pipe: Added partial insert IT (#17340)
* try
* '
* no-bomb
* fix
* Revert "side-effect"
This reverts commit cd104eaf5dd6dcd8d2a86b334c41c12506d2f72b.
* fix
* fix
* fix bug
* fix bug
* ignore flaky test
---------
Co-authored-by: HTHou <[email protected]>
---
.../org/apache/iotdb/db/it/IoTDBRestServiceIT.java | 109 ++++++++++++++++++++-
.../treemodel/auto/basic/IoTDBPipeDataSinkIT.java | 66 ++++++++++++-
.../plan/planner/LogicalPlanVisitor.java | 5 +
3 files changed, 176 insertions(+), 4 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java
index d84a2493cc2..97287dfaba2 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.it;
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
@@ -41,6 +42,7 @@ import org.apache.http.util.EntityUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -48,12 +50,20 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.COLUMN_TTL;
+import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
+import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -79,7 +89,7 @@ public class IoTDBRestServiceIT {
EnvFactory.getEnv().cleanClusterEnvironment();
}
- private String getAuthorization(String username, String password) {
+ public static String getAuthorization(String username, String password) {
return Base64.getEncoder()
.encodeToString((username + ":" +
password).getBytes(StandardCharsets.UTF_8));
}
@@ -129,7 +139,7 @@ public class IoTDBRestServiceIT {
}
}
- private HttpPost getHttpPost(String url) {
+ public static HttpPost getHttpPost(String url) {
HttpPost httpPost = new HttpPost(url);
httpPost.addHeader("Content-type", "application/json; charset=utf-8");
httpPost.setHeader("Accept", "application/json");
@@ -243,6 +253,101 @@ public class IoTDBRestServiceIT {
}
}
+ @Ignore // Flaky test
+ @Test
+ public void errorInsertRecords() throws SQLException, InterruptedException {
+ SimpleEnv simpleEnv = new SimpleEnv();
+ simpleEnv
+ .getConfig()
+ .getCommonConfig()
+ .setSchemaRegionConsensusProtocolClass(RATIS_CONSENSUS)
+ .setSchemaReplicationFactor(3)
+ .setDataRegionConsensusProtocolClass(IOT_CONSENSUS)
+ .setDataReplicationFactor(2);
+ simpleEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
+ simpleEnv.initClusterEnvironment(1, 3);
+
+ CloseableHttpResponse response = null;
+ CloseableHttpClient httpClient = HttpClientBuilder.create().build();
+ try {
+ HttpPost httpPost =
+ getHttpPost(
+ "http://"
+ + simpleEnv.getDataNodeWrapper(0).getIp()
+ + ":"
+ + simpleEnv.getDataNodeWrapper(0).getRestServicePort()
+ + "/rest/v2/insertRecords");
+ String json =
+
"{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}";
+ httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
+ for (int i = 0; i < 30; i++) {
+ try {
+ response = httpClient.execute(httpPost);
+ break;
+ } catch (Exception e) {
+ if (i == 29) {
+ throw e;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ HttpEntity responseEntity = response.getEntity();
+ String message = EntityUtils.toString(responseEntity, "utf-8");
+ JsonObject result = JsonParser.parseString(message).getAsJsonObject();
+ assertEquals(507, Integer.parseInt(result.get("code").toString()));
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ TimeUnit.SECONDS.sleep(5);
+
+ try {
+ for (DataNodeWrapper dataNodeWrapper :
simpleEnv.getDataNodeWrapperList()) {
+ dataNodeWrapper.stop();
+ try (Connection connectionAfterNodeDown =
simpleEnv.getAvailableConnection();
+ Statement statementAfterNodeDown =
connectionAfterNodeDown.createStatement()) {
+ int count = 0;
+ try (ResultSet resultSet =
+ statementAfterNodeDown.executeQuery(
+ "select s88, s77, s66, s55, s44, s33 from root.s1")) {
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ while (resultSet.next()) {
+ StringBuilder row = new StringBuilder();
+ for (int i = 0; i < metaData.getColumnCount(); i++) {
+ row.append(resultSet.getString(i + 1)).append(",");
+ }
+ System.out.println(row);
+ count++;
+ }
+ }
+ assertEquals(3, count);
+ }
+ dataNodeWrapper.start();
+ TimeUnit.SECONDS.sleep(1);
+ }
+ } catch (SQLException e) {
+ if (!e.getMessage().contains("Maybe server is down")) {
+ throw e;
+ }
+ } finally {
+ simpleEnv.cleanClusterEnvironment();
+ }
+ }
+
public void rightInsertTablet(CloseableHttpClient httpClient, String json,
HttpPost httpPost) {
CloseableHttpResponse response = null;
try {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
index 1fc21fd22b0..36a0ea0cc92 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java
@@ -29,12 +29,17 @@ import
org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic;
import
org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
+import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.Statement;
import java.util.Arrays;
@@ -44,6 +49,8 @@ import java.util.HashSet;
import java.util.Map;
import java.util.function.Consumer;
+import static org.apache.iotdb.db.it.IoTDBRestServiceIT.getHttpPost;
+
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2DualTreeAutoBasic.class})
public class IoTDBPipeDataSinkIT extends AbstractPipeDualTreeModelAutoIT {
@@ -54,8 +61,14 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeDualTreeModelAutoIT {
super.setUp();
}
+ @Override
+ protected void setupConfig() {
+ super.setupConfig();
+ senderEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
+ }
+
@Test
- public void testThriftConnectorWithRealtimeFirstDisabled() throws Exception {
+ public void testThriftSinkWithRealtimeFirstDisabled() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
final String receiverIp = receiverDataNode.getIp();
@@ -207,7 +220,7 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeDualTreeModelAutoIT {
}
@Test
- public void testLegacyConnector() throws Exception {
+ public void testLegacySink() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
final String receiverIp = receiverDataNode.getIp();
@@ -514,4 +527,53 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeDualTreeModelAutoIT {
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1.0,",
"2,1.0,"))));
}
}
+
+ @Test
+ public void testSpecialPartialInsert() throws Exception {
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe a2b with sink ('node-urls'='%s')",
+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
+ }
+
+ CloseableHttpClient httpClient = HttpClientBuilder.create().build();
+
+ HttpPost httpPost =
+ getHttpPost(
+ "http://"
+ + senderEnv.getDataNodeWrapper(0).getIp()
+ + ":"
+ + senderEnv.getDataNodeWrapper(0).getRestServicePort()
+ + "/rest/v2/insertRecords");
+ String json =
+
"{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}";
+ httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
+ for (int i = 0; i < 30; i++) {
+ try {
+ httpClient.execute(httpPost);
+ break;
+ } catch (final Exception e) {
+ if (i == 29) {
+ throw e;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select s88, s77, s66, s55, s44, s33 from root.s1",
+ "Time,root.s1.s88,root.s1.s77,root.s1.s66,root.s1.s55,root.s1.s33,",
+ new HashSet<>(
+ Arrays.asList(
+ "1635232113960,null,null,null,null,1,",
+ "1635232151960,null,null,2.0,2.1,null,",
+ "1635232143960,6.0,4.0,null,null,null,")));
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index c2fba05dae6..f69fbd91196 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -792,6 +792,11 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
insertRowStatement.getTime(),
insertRowStatement.getValues(),
insertRowStatement.isNeedInferType());
+ if (insertRowStatement.getFailedMeasurementInfoMap() != null) {
+ for (Integer index :
insertRowStatement.getFailedMeasurementInfoMap().keySet()) {
+ insertRowNode.markFailedMeasurement(index);
+ }
+ }
insertRowNode.setFailedMeasurementNumber(insertRowStatement.getFailedMeasurementNumber());
insertRowsNode.addOneInsertRowNode(insertRowNode, i);
}