This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7563ac8f6dc Pipe IT: Stablized the
IoTDBPipeClusterIT.testPipeAfterDataRegionLeaderStop (#17725)
7563ac8f6dc is described below
commit 7563ac8f6dcee0fd360904f3432c0c905687ecd8
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 20 14:15:33 2026 +0800
Pipe IT: Stablized the IoTDBPipeClusterIT.testPipeAfterDataRegionLeaderStop
(#17725)
---
.../manual/enhanced/IoTDBPipeClusterIT.java | 178 +++++++++++++++++----
1 file changed, 143 insertions(+), 35 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
index ad283d4a02c..3a0aaec2aed 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
@@ -19,11 +19,13 @@
package org.apache.iotdb.pipe.it.dual.tablemodel.manual.enhanced;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.cluster.RegionRoleType;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
@@ -51,6 +53,8 @@ import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -59,12 +63,15 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2DualTableManualEnhanced.class})
public class IoTDBPipeClusterIT extends AbstractPipeTableModelDualManualIT {
+ private static final double SYNC_LAG_DELTA = 0.001;
+
@Override
@Before
public void setUp() {
@@ -299,41 +306,7 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
TableModelUtils.insertData("test1", "test1", 100, 200, senderEnv);
- final AtomicInteger leaderPort = new AtomicInteger(-1);
- final TShowRegionResp showRegionResp =
- client.showRegion(new TShowRegionReq().setIsTableModel(true));
- showRegionResp
- .getRegionInfoList()
- .forEach(
- regionInfo -> {
- if
(RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
- leaderPort.set(regionInfo.getClientRpcPort());
- }
- });
-
- int leaderIndex = -1;
- for (int i = 0; i < 3; ++i) {
- if (senderEnv.getDataNodeWrapper(i).getPort() == leaderPort.get())
{
- leaderIndex = i;
- try {
- senderEnv.shutdownDataNode(i);
- } catch (final Throwable e) {
- e.printStackTrace();
- return;
- }
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (final InterruptedException ignored) {
- }
- try {
- senderEnv.startDataNode(i);
- ((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown();
- } catch (final Throwable e) {
- e.printStackTrace();
- return;
- }
- }
- }
+ final int leaderIndex = restartTableDataRegionLeader(client,
"test1");
if (leaderIndex == -1) { // ensure the leader is stopped
fail();
}
@@ -343,6 +316,7 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
TableModelUtils.insertData("test1", "test1", 200, 300, senderEnv);
TableModelUtils.assertData("test", "test", 0, 300, receiverEnv,
handleFailure);
+ waitForTableDataRegionReplicationComplete(Arrays.asList("test",
"test1"));
}
try {
@@ -398,6 +372,140 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
}
}
+ private int restartTableDataRegionLeader(
+ final SyncConfigNodeIServiceClient client, final String database) throws
TException {
+ final List<TRegionInfo> leaderRegionInfoList =
+ showTableDataRegionLeaders(Collections.singletonList(database),
client);
+ if (leaderRegionInfoList.isEmpty()) {
+ return -1;
+ }
+
+ final TRegionInfo targetRegionInfo =
+ leaderRegionInfoList.stream()
+ .min(Comparator.comparingInt(regionInfo ->
regionInfo.getConsensusGroupId().getId()))
+ .orElse(null);
+ if (targetRegionInfo == null) {
+ return -1;
+ }
+
+ final int leaderPort = targetRegionInfo.getClientRpcPort();
+ for (int i = 0; i < senderEnv.getDataNodeWrapperList().size(); ++i) {
+ if (senderEnv.getDataNodeWrapper(i).getPort() != leaderPort) {
+ continue;
+ }
+
+ try {
+ senderEnv.shutdownDataNode(i);
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ return -1;
+ }
+
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (final InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+ return -1;
+ }
+
+ try {
+ senderEnv.startDataNode(i);
+ ((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown();
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ return -1;
+ }
+ return i;
+ }
+ return -1;
+ }
+
+ private void waitForTableDataRegionReplicationComplete(final List<String>
databases) {
+ await()
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () -> {
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TRegionInfo> leaderRegionInfoList =
+ showTableDataRegionLeaders(databases, client);
+ Assert.assertFalse(
+ "No table DataRegion leader found for databases " +
databases,
+ leaderRegionInfoList.isEmpty());
+
+ for (final TRegionInfo regionInfo : leaderRegionInfoList) {
+ final DataNodeWrapper leaderNode =
+ findDataNodeWrapperByPort(regionInfo.getClientRpcPort());
+ final String metricsUrl =
+ "http://"
+ + leaderNode.getIp()
+ + ":"
+ + leaderNode.getMetricPort()
+ + "/metrics";
+ final String metricsContent =
senderEnv.getUrlContent(metricsUrl, null);
+ Assert.assertNotNull(
+ "Failed to fetch metrics from leader DataNode at " +
metricsUrl,
+ metricsContent);
+ assertSyncLagIsZero(metricsContent,
buildDataRegionTag(regionInfo), metricsUrl);
+ }
+ }
+ });
+ }
+
+ private List<TRegionInfo> showTableDataRegionLeaders(
+ final List<String> databases, final SyncConfigNodeIServiceClient client)
throws TException {
+ final TShowRegionResp showRegionResp =
+ client.showRegion(
+ new TShowRegionReq()
+ .setConsensusGroupType(TConsensusGroupType.DataRegion)
+ .setDatabases(databases)
+ .setIsTableModel(true));
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
showRegionResp.getStatus().getCode());
+ final List<TRegionInfo> result = new ArrayList<>();
+ for (final TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+ if
(RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
+ result.add(regionInfo);
+ }
+ }
+ return result;
+ }
+
+ private DataNodeWrapper findDataNodeWrapperByPort(final int port) {
+ for (final DataNodeWrapper dataNodeWrapper :
senderEnv.getDataNodeWrapperList()) {
+ if (dataNodeWrapper.getPort() == port) {
+ return dataNodeWrapper;
+ }
+ }
+ fail("Failed to find DataNodeWrapper for client rpc port " + port);
+ return null;
+ }
+
+ private String buildDataRegionTag(final TRegionInfo regionInfo) {
+ return "DataRegion[" + regionInfo.getConsensusGroupId().getId() + "]";
+ }
+
+ private void assertSyncLagIsZero(
+ final String metricsContent, final String dataRegionTag, final String
metricsUrl) {
+ for (final String line : metricsContent.split("\\R")) {
+ if (!line.startsWith("iot_consensus{")
+ || !line.contains("type=\"syncLag\"")
+ || !line.contains("region=\"" + dataRegionTag + "\"")) {
+ continue;
+ }
+ final int lastSpaceIndex = line.lastIndexOf(' ');
+ Assert.assertTrue("Malformed syncLag metric line: " + line,
lastSpaceIndex > 0);
+ Assert.assertEquals(
+ "Expected syncLag of " + dataRegionTag + " to be 0 at " + metricsUrl
+ " but got " + line,
+ 0.0,
+ Double.parseDouble(line.substring(lastSpaceIndex + 1)),
+ SYNC_LAG_DELTA);
+ return;
+ }
+ fail("No syncLag metric found for " + dataRegionTag + " at " + metricsUrl);
+ }
+
@Test
public void testPipeAfterRegisterNewDataNode() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);