This is an automated email from the ASF dual-hosted git repository. ericpai pushed a commit to branch improve/iotdb-5410 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 10bca3d3e81a8e31bbcdda7d40a4457bacebac2e Author: ericpai <[email protected]> AuthorDate: Thu Jan 12 17:48:22 2023 +0800 [IOTDB-5410] Refine IT: Refine Session and SessionPool IT --- integration-test/import-control.xml | 2 +- .../it/cluster/IoTDBClusterNodeErrorStartUpIT.java | 30 ---- .../iotdb/session/it/pool/SessionPoolIT.java | 152 +++++---------------- .../java/org/apache/iotdb/session/TabletTest.java | 81 +++++++++++ .../apache/iotdb/session/pool/SessionPoolTest.java | 63 +++++++++ 5 files changed, 181 insertions(+), 147 deletions(-) diff --git a/integration-test/import-control.xml b/integration-test/import-control.xml index 0465588691..4c685eca80 100644 --- a/integration-test/import-control.xml +++ b/integration-test/import-control.xml @@ -89,7 +89,7 @@ <allow class="org.apache.iotdb.commons.conf.IoTDBConstant" /> <allow class="org.apache.iotdb.db.conf.IoTDBDescriptor" /> <allow class="org.apache.iotdb.db.conf.OperationType" /> - <allow class="org.apache.iotdb.db.utils.EnvironmentUtils" /> + <allow class="org.apache.iotdb.commons.cluster.NodeStatus" /> <allow class="org.apache.iotdb.tsfile.common.constant.TsFileConstant" /> <allow class="org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp" /> <allow class="org.apache.iotdb.service.rpc.thrift.TSConnectionType" /> diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java index 8ec75c86d3..a57a3d1570 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java @@ -50,8 +50,6 @@ import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; @@ -61,17 +59,12 @@ import java.util.Arrays; @Category({ClusterIT.class}) public class IoTDBClusterNodeErrorStartUpIT { - private static final Logger logger = - LoggerFactory.getLogger(IoTDBClusterNodeErrorStartUpIT.class); - private static final int testConfigNodeNum = 3; private static final int testDataNodeNum = 1; - private static final int testNodeNum = testConfigNodeNum + testDataNodeNum; private static final String testConsensusProtocolClass = ConsensusFactory.RATIS_CONSENSUS; private static final String TEST_CLUSTER_NAME = "defaultCluster"; private static final String ERROR_CLUSTER_NAME = "errorCluster"; - private static final int maxRetryTimes = 60; @Before public void setUp() throws Exception { @@ -328,27 +321,4 @@ public class IoTDBClusterNodeErrorStartUpIT { Arrays.asList(NodeStatus.Running, NodeStatus.Running)); } } - - private String showClusterStatus(TShowClusterResp showClusterResp) { - StringBuilder sb = new StringBuilder(); - showClusterResp - .getConfigNodeList() - .forEach( - d -> - sb.append("ConfigNode") - .append(d.getInternalEndPoint().getPort()) - .append(": ") - .append(showClusterResp.getNodeStatus().get(d.getConfigNodeId())) - .append("\n")); - showClusterResp - .getDataNodeList() - .forEach( - d -> - sb.append("DataNode") - .append(d.getClientRpcEndPoint().getPort()) - .append(": ") - .append(showClusterResp.getNodeStatus().get(d.getDataNodeId())) - .append("\n")); - return sb.toString(); - } } diff --git a/integration-test/src/test/java/org/apache/iotdb/session/it/pool/SessionPoolIT.java b/integration-test/src/test/java/org/apache/iotdb/session/it/pool/SessionPoolIT.java index 5e729bf073..ef11420d19 100644 --- a/integration-test/src/test/java/org/apache/iotdb/session/it/pool/SessionPoolIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/session/it/pool/SessionPoolIT.java @@ -18,39 +18,28 @@ */ package org.apache.iotdb.session.it.pool; -import org.apache.iotdb.commons.client.exception.ClientManagerException; -import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.commons.cluster.NodeStatus; -import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo; -import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp; import org.apache.iotdb.isession.SessionConfig; import org.apache.iotdb.isession.pool.ISessionPool; import org.apache.iotdb.isession.pool.SessionDataSetWrapper; -import org.apache.iotdb.isession.util.Version; import org.apache.iotdb.it.env.EnvFactory; -import org.apache.iotdb.it.env.cluster.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.itbase.category.LocalStandaloneIT; +import org.apache.iotdb.itbase.env.BaseNodeWrapper; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.session.pool.SessionPool; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.thrift.TException; import org.junit.After; -import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.time.ZoneId; -import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -253,14 +242,16 @@ public class SessionPoolIT { } @Test - public void tryIfTheServerIsRestart() - throws InterruptedException, TException, ClientManagerException, IOException { + public void tryIfTheServerIsRestart() { ISessionPool pool = EnvFactory.getEnv().getSessionPool(3); SessionDataSetWrapper wrapper = null; + BaseNodeWrapper node = EnvFactory.getEnv().getDataNodeWrapper(0); try { wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time > 1"); - EnvFactory.getEnv().getDataNodeWrapper(0).stop(); - EnvFactory.getEnv().getDataNodeWrapper(0).waitingToShutDown(); + node.stop(); + EnvFactory.getEnv() + .ensureNodeStatus( + Collections.singletonList(node), Collections.singletonList(NodeStatus.Unknown)); // user does not know what happens. while (wrapper.hasNext()) { wrapper.next(); @@ -268,11 +259,14 @@ public class SessionPoolIT { } catch (IoTDBConnectionException e) { pool.closeResultSet(wrapper); pool.close(); - EnvFactory.getEnv().getDataNodeWrapper(0).stop(); - EnvFactory.getEnv().getDataNodeWrapper(0).waitingToShutDown(); - Assert.assertTrue(waitDataNodeStatusUnknown(EnvFactory.getEnv().getDataNodeWrapper(0))); - EnvFactory.getEnv().getDataNodeWrapper(0).start(); - TimeUnit.SECONDS.sleep(10); + node.stop(); + EnvFactory.getEnv() + .ensureNodeStatus( + Collections.singletonList(node), Collections.singletonList(NodeStatus.Unknown)); + node.start(); + EnvFactory.getEnv() + .ensureNodeStatus( + Collections.singletonList(node), Collections.singletonList(NodeStatus.Running)); pool = EnvFactory.getEnv().getSessionPool(3); correctQuery(pool, DEFAULT_QUERY_TIMEOUT); pool.close(); @@ -291,10 +285,14 @@ public class SessionPoolIT { } catch (IoTDBConnectionException ec) { pool.closeResultSet(wrapper); pool.close(); - EnvFactory.getEnv().getDataNodeWrapper(0).stop(); - EnvFactory.getEnv().getDataNodeWrapper(0).waitingToShutDown(); - Assert.assertTrue(waitDataNodeStatusUnknown(EnvFactory.getEnv().getDataNodeWrapper(0))); - EnvFactory.getEnv().getDataNodeWrapper(0).start(); + node.stop(); + EnvFactory.getEnv() + .ensureNodeStatus( + Collections.singletonList(node), Collections.singletonList(NodeStatus.Unknown)); + node.start(); + EnvFactory.getEnv() + .ensureNodeStatus( + Collections.singletonList(node), Collections.singletonList(NodeStatus.Running)); pool = EnvFactory.getEnv().getSessionPool(3); correctQuery(pool, DEFAULT_QUERY_TIMEOUT); pool.close(); @@ -312,24 +310,8 @@ public class SessionPoolIT { } @Test - @Ignore public void tryIfTheServerIsRestartButDataIsGotten() { - SessionPool pool = - new SessionPool( - "127.0.0.1", - 6667, - "root", - "root", - 3, - 1, - 60000, - false, - null, - false, - SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS, - SessionConfig.DEFAULT_VERSION, - SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY, - SessionConfig.DEFAULT_MAX_FRAME_SIZE); + ISessionPool pool = EnvFactory.getEnv().getSessionPool(3); write10Data(pool, true); assertEquals(1, pool.currentAvailableSize()); SessionDataSetWrapper wrapper; @@ -353,20 +335,24 @@ public class SessionPoolIT { } @Test - public void restart() - throws TException, ClientManagerException, IOException, InterruptedException { + public void restart() { ISessionPool pool = EnvFactory.getEnv().getSessionPool(1); write10Data(pool, true); // stop the server. pool.close(); - EnvFactory.getEnv().getDataNodeWrapper(0).stop(); - EnvFactory.getEnv().getDataNodeWrapper(0).waitingToShutDown(); + BaseNodeWrapper node = EnvFactory.getEnv().getDataNodeWrapper(0); + node.stop(); + EnvFactory.getEnv() + .ensureNodeStatus( + Collections.singletonList(node), Collections.singletonList(NodeStatus.Unknown)); pool = EnvFactory.getEnv().getSessionPool(1); // all this ten data will fail. write10Data(pool, false); // restart the server - Assert.assertTrue(waitDataNodeStatusUnknown(EnvFactory.getEnv().getDataNodeWrapper(0))); - EnvFactory.getEnv().getDataNodeWrapper(0).start(); + node.start(); + EnvFactory.getEnv() + .ensureNodeStatus( + Collections.singletonList(node), Collections.singletonList(NodeStatus.Running)); write10Data(pool, true); pool.close(); } @@ -410,56 +396,9 @@ public class SessionPoolIT { // session. } - @Test - public void testBuilder() { - SessionPool pool = - new SessionPool.Builder() - .host("localhost") - .port(1234) - .maxSize(10) - .user("abc") - .password("123") - .fetchSize(1) - .waitToGetSessionTimeoutInMs(2) - .enableRedirection(true) - .enableCompression(true) - .zoneId(ZoneOffset.UTC) - .connectionTimeoutInMs(3) - .version(Version.V_0_13) - .build(); - - assertEquals("localhost", pool.getHost()); - assertEquals(1234, pool.getPort()); - assertEquals("abc", pool.getUser()); - assertEquals("123", pool.getPassword()); - assertEquals(10, pool.getMaxSize()); - assertEquals(1, pool.getFetchSize()); - assertEquals(2, pool.getWaitToGetSessionTimeoutInMs()); - assertTrue(pool.isEnableRedirection()); - assertTrue(pool.isEnableCompression()); - assertEquals(3, pool.getConnectionTimeoutInMs()); - assertEquals(ZoneOffset.UTC, pool.getZoneId()); - assertEquals(Version.V_0_13, pool.getVersion()); - } - @Test public void testSetters() { - SessionPool pool = - new SessionPool( - "127.0.0.1", - 6667, - "root", - "root", - 3, - 1, - 60000, - false, - null, - false, - SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS, - SessionConfig.DEFAULT_VERSION, - SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY, - SessionConfig.DEFAULT_MAX_FRAME_SIZE); + ISessionPool pool = EnvFactory.getEnv().getSessionPool(3); try { pool.setEnableRedirection(true); assertTrue(pool.isEnableRedirection()); @@ -474,29 +413,10 @@ public class SessionPoolIT { pool.setFetchSize(16); assertEquals(16, pool.getFetchSize()); } catch (Exception e) { + e.printStackTrace(); fail(e.getMessage()); } finally { pool.close(); } } - - private boolean waitDataNodeStatusUnknown(DataNodeWrapper dataNode) - throws ClientManagerException, IOException, InterruptedException, TException { - try (SyncConfigNodeIServiceClient client = - (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { - // At least wait 20 seconds - for (int count = 0; count < 30; count++) { - TShowDataNodesResp showDataNodesResp = client.showDataNodes(); - for (TDataNodeInfo dataNodeInfo : showDataNodesResp.getDataNodesInfoList()) { - if (dataNodeInfo.getRpcAddresss().equals(dataNode.getIp()) - && dataNodeInfo.getRpcPort() == dataNode.getPort() - && NodeStatus.Unknown.getStatus().equals(dataNodeInfo.getStatus())) { - return true; - } - } - TimeUnit.SECONDS.sleep(1); - } - } - return false; - } } diff --git a/session/src/test/java/org/apache/iotdb/session/TabletTest.java b/session/src/test/java/org/apache/iotdb/session/TabletTest.java new file mode 100644 index 0000000000..1ab00b3c52 --- /dev/null +++ b/session/src/test/java/org/apache/iotdb/session/TabletTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.session; + +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; + +public class TabletTest { + @Test + public void testSortTablet() { + Session session = new Session("127.0.0.1", 1234); + List<MeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE)); + // insert three rows data + Tablet tablet = new Tablet("root.sg1.d1", schemaList, 3); + long[] timestamps = tablet.timestamps; + Object[] values = tablet.values; + + /* + inorder data before inserting + timestamp s1 + 2 0 + 0 1 + 1 2 + */ + // inorder timestamps + timestamps[0] = 2; + timestamps[1] = 0; + timestamps[2] = 1; + // just one column INT64 data + long[] sensor = (long[]) values[0]; + sensor[0] = 0; + sensor[1] = 1; + sensor[2] = 2; + tablet.rowSize = 3; + + session.sortTablet(tablet); + + /* + After sorting, if the tablet data is sorted according to the timestamps, + data in tablet will be + timestamp s1 + 0 1 + 1 2 + 2 0 + + If the data equal to above tablet, test pass, otherwise test fialed + */ + long[] resTimestamps = tablet.timestamps; + long[] resValues = (long[]) tablet.values[0]; + long[] expectedTimestamps = new long[] {0, 1, 2}; + long[] expectedValues = new long[] {1, 2, 0}; + assertArrayEquals(expectedTimestamps, resTimestamps); + assertArrayEquals(expectedValues, resValues); + } +} diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java new file mode 100644 index 0000000000..9e00440895 --- /dev/null +++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.session.pool; + +import org.apache.iotdb.isession.util.Version; + +import org.junit.Test; + +import java.time.ZoneOffset; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class SessionPoolTest { + + @Test + public void testBuilder() { + SessionPool pool = + new SessionPool.Builder() + .host("localhost") + .port(1234) + .maxSize(10) + .user("abc") + .password("123") + .fetchSize(1) + .waitToGetSessionTimeoutInMs(2) + .enableRedirection(true) + .enableCompression(true) + .zoneId(ZoneOffset.UTC) + .connectionTimeoutInMs(3) + .version(Version.V_0_13) + .build(); + + assertEquals("localhost", pool.getHost()); + assertEquals(1234, pool.getPort()); + assertEquals("abc", pool.getUser()); + assertEquals("123", pool.getPassword()); + assertEquals(10, pool.getMaxSize()); + assertEquals(1, pool.getFetchSize()); + assertEquals(2, pool.getWaitToGetSessionTimeoutInMs()); + assertTrue(pool.isEnableRedirection()); + assertTrue(pool.isEnableCompression()); + assertEquals(3, pool.getConnectionTimeoutInMs()); + assertEquals(ZoneOffset.UTC, pool.getZoneId()); + assertEquals(Version.V_0_13, pool.getVersion()); + } +}
