This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 5785485eb4f [To dev/1.3] Fixed the REST partial insert & Pipe: Added
partial insert IT (#17339)
5785485eb4f is described below
commit 5785485eb4f9e293569e544779c8abb6d95fe344
Author: Caideyipi <[email protected]>
AuthorDate: Wed Mar 25 11:26:37 2026 +0800
[To dev/1.3] Fixed the REST partial insert & Pipe: Added partial insert IT
(#17339)
* try
* '
* no-bomb
* side-effect
* Revert "side-effect"
This reverts commit cd104eaf5dd6dcd8d2a86b334c41c12506d2f72b.
* under-flow
* fix
* Fix rest partial insert serde error
* fix testSpecialPartialInsert
* add IT
* fix IT
* ignore flaky test
---------
Co-authored-by: HTHou <[email protected]>
---
.../iotdb/it/env/cluster/env/AbstractEnv.java | 25 +++++
.../iotdb/it/env/remote/env/RemoteServerEnv.java | 5 +
.../java/org/apache/iotdb/itbase/env/BaseEnv.java | 6 ++
.../org/apache/iotdb/db/it/IoTDBRestServiceIT.java | 109 ++++++++++++++++++++-
.../pipe/it/autocreate/IoTDBPipeDataSinkIT.java | 67 ++++++++++++-
.../plan/planner/LogicalPlanVisitor.java | 5 +
6 files changed, 213 insertions(+), 4 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 547b010a322..60ff77aa03c 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -440,6 +440,14 @@ public abstract class AbstractEnv implements BaseEnv {
this);
}
+ @Override
+ public Connection getAvailableConnection(String username, String password)
throws SQLException {
+ return new ClusterTestConnection(
+ getWriteConnection(null, username, password),
+ getOneAvailableReadConnection(null, username, password),
+ this);
+ }
+
@Override
public Connection getConnection(
final DataNodeWrapper dataNodeWrapper, final String username, final
String password)
@@ -656,6 +664,23 @@ public abstract class AbstractEnv implements BaseEnv {
return readConnRequestDelegate.requestAll();
}
+ protected List<NodeConnection> getOneAvailableReadConnection(
+ final Constant.Version version, final String username, final String
password)
+ throws SQLException {
+ final List<DataNodeWrapper> dataNodeWrapperListCopy = new
ArrayList<>(dataNodeWrapperList);
+ Collections.shuffle(dataNodeWrapperListCopy);
+ SQLException lastException = null;
+ for (final DataNodeWrapper dataNode : dataNodeWrapperListCopy) {
+ try {
+ return getReadConnections(version, dataNode, username, password);
+ } catch (final SQLException e) {
+ lastException = e;
+ }
+ }
+ logger.error("Failed to get connection from any DataNode, last exception
is ", lastException);
+ throw lastException;
+ }
+
// use this to avoid some runtimeExceptions when try to get jdbc connections.
// because it is hard to add retry and handle exception when getting jdbc
connections in
// getWriteConnectionWithSpecifiedDataNode and getReadConnections.
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
index efd883e41a9..5c4f91cf5c6 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
@@ -136,6 +136,11 @@ public class RemoteServerEnv implements BaseEnv {
return connection;
}
+ @Override
+ public Connection getAvailableConnection(String username, String password)
throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public Connection getWriteOnlyConnectionWithSpecifiedDataNode(
DataNodeWrapper dataNode, String username, String password) {
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index c3dc9a3eb32..bede2b001f7 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -120,6 +120,12 @@ public interface BaseEnv {
Connection getConnection(DataNodeWrapper dataNodeWrapper, String username,
String password)
throws SQLException;
+ default Connection getAvailableConnection() throws SQLException {
+ return getAvailableConnection(SessionConfig.DEFAULT_USER,
SessionConfig.DEFAULT_PASSWORD);
+ }
+
+ Connection getAvailableConnection(String username, String password) throws
SQLException;
+
default Connection
getWriteOnlyConnectionWithSpecifiedDataNode(DataNodeWrapper dataNode)
throws SQLException {
return getWriteOnlyConnectionWithSpecifiedDataNode(
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java
index 53b92918877..ad1b93ad2d8 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.it;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
@@ -41,6 +42,7 @@ import org.apache.http.util.EntityUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -48,11 +50,19 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
+import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
import static
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.COLUMN_TTL;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -78,7 +88,7 @@ public class IoTDBRestServiceIT {
EnvFactory.getEnv().cleanClusterEnvironment();
}
- private String getAuthorization(String username, String password) {
+ public static String getAuthorization(String username, String password) {
return Base64.getEncoder()
.encodeToString((username + ":" +
password).getBytes(StandardCharsets.UTF_8));
}
@@ -128,7 +138,7 @@ public class IoTDBRestServiceIT {
}
}
- private HttpPost getHttpPost(String url) {
+ public static HttpPost getHttpPost(String url) {
HttpPost httpPost = new HttpPost(url);
httpPost.addHeader("Content-type", "application/json; charset=utf-8");
httpPost.setHeader("Accept", "application/json");
@@ -242,6 +252,101 @@ public class IoTDBRestServiceIT {
}
}
+ @Ignore // Flaky test
+ @Test
+ public void errorInsertRecords() throws SQLException, InterruptedException {
+ SimpleEnv simpleEnv = new SimpleEnv();
+ simpleEnv
+ .getConfig()
+ .getCommonConfig()
+ .setSchemaRegionConsensusProtocolClass(RATIS_CONSENSUS)
+ .setSchemaReplicationFactor(3)
+ .setDataRegionConsensusProtocolClass(IOT_CONSENSUS)
+ .setDataReplicationFactor(2);
+ simpleEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
+ simpleEnv.initClusterEnvironment(1, 3);
+
+ CloseableHttpResponse response = null;
+ CloseableHttpClient httpClient = HttpClientBuilder.create().build();
+ try {
+ HttpPost httpPost =
+ getHttpPost(
+ "http://"
+ + simpleEnv.getDataNodeWrapper(0).getIp()
+ + ":"
+ + simpleEnv.getDataNodeWrapper(0).getRestServicePort()
+ + "/rest/v2/insertRecords");
+ String json =
+
"{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}";
+ httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
+ for (int i = 0; i < 30; i++) {
+ try {
+ response = httpClient.execute(httpPost);
+ break;
+ } catch (Exception e) {
+ if (i == 29) {
+ throw e;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ HttpEntity responseEntity = response.getEntity();
+ String message = EntityUtils.toString(responseEntity, "utf-8");
+ JsonObject result = JsonParser.parseString(message).getAsJsonObject();
+ assertEquals(507, Integer.parseInt(result.get("code").toString()));
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ try {
+ if (response != null) {
+ response.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ TimeUnit.SECONDS.sleep(5);
+
+ try {
+ for (DataNodeWrapper dataNodeWrapper :
simpleEnv.getDataNodeWrapperList()) {
+ dataNodeWrapper.stop();
+ try (Connection connectionAfterNodeDown =
simpleEnv.getAvailableConnection();
+ Statement statementAfterNodeDown =
connectionAfterNodeDown.createStatement()) {
+ int count = 0;
+ try (ResultSet resultSet =
+ statementAfterNodeDown.executeQuery(
+ "select s88, s77, s66, s55, s44, s33 from root.s1")) {
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ while (resultSet.next()) {
+ StringBuilder row = new StringBuilder();
+ for (int i = 0; i < metaData.getColumnCount(); i++) {
+ row.append(resultSet.getString(i + 1)).append(",");
+ }
+ System.out.println(row);
+ count++;
+ }
+ }
+ assertEquals(3, count);
+ }
+ dataNodeWrapper.start();
+ TimeUnit.SECONDS.sleep(1);
+ }
+ } catch (SQLException e) {
+ if (!e.getMessage().contains("Maybe server is down")) {
+ throw e;
+ }
+ } finally {
+ simpleEnv.cleanClusterEnvironment();
+ }
+ }
+
public void rightInsertTablet(CloseableHttpClient httpClient, String json,
HttpPost httpPost) {
CloseableHttpResponse response = null;
try {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
index 5216fe20a27..86e53481074 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
@@ -28,11 +28,16 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
+import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.Statement;
import java.util.Arrays;
@@ -42,11 +47,20 @@ import java.util.HashSet;
import java.util.Map;
import java.util.function.Consumer;
+import static org.apache.iotdb.db.it.IoTDBRestServiceIT.getHttpPost;
+
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2AutoCreateSchema.class})
public class IoTDBPipeDataSinkIT extends AbstractPipeDualAutoIT {
+
+ @Override
+ protected void setupConfig() {
+ super.setupConfig();
+ senderEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
+ }
+
@Test
- public void testThriftConnectorWithRealtimeFirstDisabled() throws Exception {
+ public void testThriftSinkWithRealtimeFirstDisabled() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
final String receiverIp = receiverDataNode.getIp();
@@ -196,7 +210,7 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeDualAutoIT {
}
@Test
- public void testLegacyConnector() throws Exception {
+ public void testLegacySink() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
final String receiverIp = receiverDataNode.getIp();
@@ -503,4 +517,53 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeDualAutoIT {
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1.0,",
"2,1.0,"))));
}
}
+
+ @Test
+ public void testSpecialPartialInsert() throws Exception {
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe a2b with sink ('node-urls'='%s')",
+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
+ }
+
+ CloseableHttpClient httpClient = HttpClientBuilder.create().build();
+
+ HttpPost httpPost =
+ getHttpPost(
+ "http://"
+ + senderEnv.getDataNodeWrapper(0).getIp()
+ + ":"
+ + senderEnv.getDataNodeWrapper(0).getRestServicePort()
+ + "/rest/v2/insertRecords");
+ String json =
+
"{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}";
+ httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
+ for (int i = 0; i < 30; i++) {
+ try {
+ httpClient.execute(httpPost);
+ break;
+ } catch (final Exception e) {
+ if (i == 29) {
+ throw e;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select s88, s77, s66, s55, s44, s33 from root.s1",
+ "Time,root.s1.s88,root.s1.s77,root.s1.s66,root.s1.s55,root.s1.s33,",
+ new HashSet<>(
+ Arrays.asList(
+ "1635232113960,null,null,null,null,1,",
+ "1635232151960,null,null,2.0,2.1,null,",
+ "1635232143960,6.0,4.0,null,null,null,")));
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index ee2ccc30586..be0b28a0282 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -714,6 +714,11 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
insertRowStatement.getTime(),
insertRowStatement.getValues(),
insertRowStatement.isNeedInferType());
+ if (insertRowStatement.getFailedMeasurementInfoMap() != null) {
+ for (Integer index :
insertRowStatement.getFailedMeasurementInfoMap().keySet()) {
+ insertRowNode.markFailedMeasurement(index);
+ }
+ }
insertRowNode.setFailedMeasurementNumber(insertRowStatement.getFailedMeasurementNumber());
insertRowsNode.addOneInsertRowNode(insertRowNode, i);
}