This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e66320a169 [IOTDB-2988] Cache leader in session (#6083)
e66320a169 is described below
commit e66320a1697b4b652b7930b73490e83e580027a9
Author: Mrquan <[email protected]>
AuthorDate: Thu Jun 2 08:52:18 2022 +0800
[IOTDB-2988] Cache leader in session (#6083)
---
.../db/mpp/plan/execution/QueryExecution.java | 48 ++++++-
.../plan/statement/crud/InsertBaseStatement.java | 4 +
.../crud/InsertMultiTabletsStatement.java | 18 +++
.../plan/statement/crud/InsertRowStatement.java | 12 ++
.../crud/InsertRowsOfOneDeviceStatement.java | 9 ++
.../plan/statement/crud/InsertRowsStatement.java | 17 +++
.../plan/statement/crud/InsertTabletStatement.java | 14 ++
.../java/org/apache/iotdb/session/Session.java | 43 +-----
.../apache/iotdb/session/SessionConnection.java | 8 +-
.../apache/iotdb/session/SessionCacheLeaderUT.java | 150 ++-------------------
10 files changed, 131 insertions(+), 192 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 254b38f93a..06643f3c2c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -19,8 +19,10 @@
package org.apache.iotdb.db.mpp.plan.execution;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
@@ -48,6 +50,9 @@ import
org.apache.iotdb.db.mpp.plan.scheduler.ClusterScheduler;
import org.apache.iotdb.db.mpp.plan.scheduler.IScheduler;
import org.apache.iotdb.db.mpp.plan.scheduler.StandaloneScheduler;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertBaseStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -328,13 +333,7 @@ public class QueryExecution implements IQueryExecution {
try {
QueryState state = future.get();
// TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't
FINISHED
- TSStatusCode statusCode =
- // For WRITE, the state should be FINISHED; For READ, the state
could be RUNNING
- state == QueryState.FINISHED || state == QueryState.RUNNING
- ? TSStatusCode.SUCCESS_STATUS
- : TSStatusCode.QUERY_PROCESS_ERROR;
- return new ExecutionResult(
- context.getQueryId(), RpcUtils.getStatus(statusCode,
stateMachine.getFailureMessage()));
+ return getExecutionResult(state);
} catch (InterruptedException | ExecutionException e) {
// TODO: (xingtanzjr) use more accurate error handling
if (e instanceof InterruptedException) {
@@ -360,6 +359,41 @@ public class QueryExecution implements IQueryExecution {
}
}
+ private ExecutionResult getExecutionResult(QueryState state) {
+ TSStatusCode statusCode =
+ // For WRITE, the state should be FINISHED; For READ, the state could
be RUNNING
+ state == QueryState.FINISHED || state == QueryState.RUNNING
+ ? TSStatusCode.SUCCESS_STATUS
+ : TSStatusCode.QUERY_PROCESS_ERROR;
+
+ TSStatus tsstatus = RpcUtils.getStatus(statusCode,
stateMachine.getFailureMessage());
+
+ // collect redirect info to client for writing
+ if (analysis.getStatement() instanceof InsertBaseStatement) {
+ InsertBaseStatement insertStatement = (InsertBaseStatement)
analysis.getStatement();
+ List<TEndPoint> redirectNodeList =
+ insertStatement.collectRedirectInfo(analysis.getDataPartitionInfo());
+ if (insertStatement instanceof InsertRowsStatement
+ || insertStatement instanceof InsertMultiTabletsStatement) {
+ // multiple devices
+ if (statusCode == TSStatusCode.SUCCESS_STATUS) {
+ List<TSStatus> subStatus = new ArrayList<>();
+ tsstatus.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+ for (TEndPoint endPoint : redirectNodeList) {
+ subStatus.add(
+
StatusUtils.getStatus(TSStatusCode.NEED_REDIRECTION).setRedirectNode(endPoint));
+ }
+ tsstatus.setSubStatus(subStatus);
+ }
+ } else {
+ // single device
+ tsstatus.setRedirectNode(redirectNodeList.get(0));
+ }
+ }
+
+ return new ExecutionResult(context.getQueryId(), tsstatus);
+ }
+
public DistributedQueryPlan getDistributedPlan() {
return distributedPlan;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
index 7d4960e109..f418b604c8 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.mpp.plan.statement.crud;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -75,4 +77,6 @@ public abstract class InsertBaseStatement extends Statement {
public List<PartialPath> getPaths() {
return Collections.emptyList();
}
+
+ public abstract List<TEndPoint> collectRedirectInfo(DataPartition
dataPartition);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
index e4bfac0e95..c3a548b27a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
@@ -19,7 +19,11 @@
package org.apache.iotdb.db.mpp.plan.statement.crud;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -83,4 +87,18 @@ public class InsertMultiTabletsStatement extends
InsertBaseStatement {
}
return result;
}
+
+ @Override
+ public List<TEndPoint> collectRedirectInfo(DataPartition dataPartition) {
+ List<TEndPoint> result = new ArrayList<>();
+ for (InsertTabletStatement insertTabletStatement :
insertTabletStatementList) {
+ TRegionReplicaSet regionReplicaSet =
+ dataPartition.getDataRegionReplicaSetForWriting(
+ insertTabletStatement.devicePath.getFullPath(),
+ StorageEngineV2.getTimePartitionSlot(
+
insertTabletStatement.getTimes()[insertTabletStatement.getTimes().length - 1]));
+
result.add(regionReplicaSet.getDataNodeLocations().get(0).getExternalEndPoint());
+ }
+ return result;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
index 6259e88956..08f4f607e5 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
@@ -19,7 +19,10 @@
package org.apache.iotdb.db.mpp.plan.statement.crud;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -122,6 +125,15 @@ public class InsertRowStatement extends
InsertBaseStatement {
return
Collections.singletonList(StorageEngineV2.getTimePartitionSlot(time));
}
+ @Override
+ public List<TEndPoint> collectRedirectInfo(DataPartition dataPartition) {
+ TRegionReplicaSet regionReplicaSet =
+ dataPartition.getDataRegionReplicaSetForWriting(
+ devicePath.getFullPath(),
StorageEngineV2.getTimePartitionSlot(time));
+ return Collections.singletonList(
+ regionReplicaSet.getDataNodeLocations().get(0).getExternalEndPoint());
+ }
+
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitInsertRow(this, context);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
index dc44a46827..9dee624b36 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.mpp.plan.statement.crud;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
@@ -74,6 +76,13 @@ public class InsertRowsOfOneDeviceStatement extends
InsertBaseStatement {
return new ArrayList<>(timePartitionSlotSet);
}
+ @Override
+ public List<TEndPoint> collectRedirectInfo(DataPartition dataPartition) {
+ return insertRowStatementList
+ .get(insertRowStatementList.size() - 1)
+ .collectRedirectInfo(dataPartition);
+ }
+
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitInsertRowsOfOneDevice(this, context);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
index f430cba52c..3b53d90f21 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
@@ -19,7 +19,11 @@
package org.apache.iotdb.db.mpp.plan.statement.crud;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -83,4 +87,17 @@ public class InsertRowsStatement extends InsertBaseStatement
{
}
return result;
}
+
+ @Override
+ public List<TEndPoint> collectRedirectInfo(DataPartition dataPartition) {
+ List<TEndPoint> result = new ArrayList<>();
+ for (InsertRowStatement insertRowStatement : insertRowStatementList) {
+ TRegionReplicaSet regionReplicaSet =
+ dataPartition.getDataRegionReplicaSetForWriting(
+ insertRowStatement.devicePath.getFullPath(),
+
StorageEngineV2.getTimePartitionSlot(insertRowStatement.getTime()));
+
result.add(regionReplicaSet.getDataNodeLocations().get(0).getExternalEndPoint());
+ }
+ return result;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
index 70f4f6d9e1..798c2ad328 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
@@ -18,13 +18,17 @@
*/
package org.apache.iotdb.db.mpp.plan.statement.crud;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.tsfile.utils.BitMap;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
public class InsertTabletStatement extends InsertBaseStatement {
@@ -88,6 +92,16 @@ public class InsertTabletStatement extends
InsertBaseStatement {
return result;
}
+ @Override
+ public List<TEndPoint> collectRedirectInfo(DataPartition dataPartition) {
+ TRegionReplicaSet regionReplicaSet =
+ dataPartition.getDataRegionReplicaSetForWriting(
+ devicePath.getFullPath(),
+ StorageEngineV2.getTimePartitionSlot(times[times.length - 1]));
+ return Collections.singletonList(
+ regionReplicaSet.getDataNodeLocations().get(0).getExternalEndPoint());
+ }
+
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitInsertTablet(this, context);
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java
b/session/src/main/java/org/apache/iotdb/session/Session.java
index 60cf271c7f..854450f85d 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -127,7 +127,6 @@ public class Session {
// Cluster version cache
protected boolean enableCacheLeader;
- protected SessionConnection metaSessionConnection;
protected volatile Map<String, TEndPoint> deviceIdToEndpoint;
protected volatile Map<TEndPoint, SessionConnection>
endPointToSessionConnection;
@@ -402,7 +401,6 @@ public class Session {
this.connectionTimeoutInMs = connectionTimeoutInMs;
defaultSessionConnection = constructSessionConnection(this,
defaultEndPoint, zoneId);
defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
- metaSessionConnection = defaultSessionConnection;
isClosed = false;
if (enableCacheLeader || enableQueryRedirection) {
deviceIdToEndpoint = new ConcurrentHashMap<>();
@@ -447,29 +445,17 @@ public class Session {
public void setStorageGroup(String storageGroup)
throws IoTDBConnectionException, StatementExecutionException {
- try {
- metaSessionConnection.setStorageGroup(storageGroup);
- } catch (RedirectException e) {
- handleMetaRedirection(storageGroup, e);
- }
+ defaultSessionConnection.setStorageGroup(storageGroup);
}
public void deleteStorageGroup(String storageGroup)
throws IoTDBConnectionException, StatementExecutionException {
- try {
-
metaSessionConnection.deleteStorageGroups(Collections.singletonList(storageGroup));
- } catch (RedirectException e) {
- handleMetaRedirection(storageGroup, e);
- }
+
defaultSessionConnection.deleteStorageGroups(Collections.singletonList(storageGroup));
}
public void deleteStorageGroups(List<String> storageGroups)
throws IoTDBConnectionException, StatementExecutionException {
- try {
- metaSessionConnection.deleteStorageGroups(storageGroups);
- } catch (RedirectException e) {
- handleMetaRedirection(storageGroups.toString(), e);
- }
+ defaultSessionConnection.deleteStorageGroups(storageGroups);
}
public void createTimeseries(
@@ -874,29 +860,6 @@ public class Session {
}
}
- private void handleMetaRedirection(String storageGroup, RedirectException e)
- throws IoTDBConnectionException {
- if (enableCacheLeader) {
- logger.debug("storageGroup[{}]:{}", storageGroup, e.getMessage());
- AtomicReference<IoTDBConnectionException> exceptionReference = new
AtomicReference<>();
- SessionConnection connection =
- endPointToSessionConnection.computeIfAbsent(
- e.getEndPoint(),
- k -> {
- try {
- return constructSessionConnection(this, e.getEndPoint(),
zoneId);
- } catch (IoTDBConnectionException ex) {
- exceptionReference.set(ex);
- return null;
- }
- });
- if (connection == null) {
- throw new IoTDBConnectionException(exceptionReference.get());
- }
- metaSessionConnection = connection;
- }
- }
-
private void handleRedirection(String deviceId, TEndPoint endpoint)
throws IoTDBConnectionException {
if (enableCacheLeader) {
diff --git
a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 76c3f3ce7c..0f0741ec21 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -222,9 +222,9 @@ public class SessionConnection {
}
protected void setStorageGroup(String storageGroup)
- throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
+ throws IoTDBConnectionException, StatementExecutionException {
try {
- RpcUtils.verifySuccessWithRedirection(client.setStorageGroup(sessionId,
storageGroup));
+ RpcUtils.verifySuccess(client.setStorageGroup(sessionId, storageGroup));
} catch (TException e) {
if (reconnect()) {
try {
@@ -239,9 +239,9 @@ public class SessionConnection {
}
protected void deleteStorageGroups(List<String> storageGroups)
- throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
+ throws IoTDBConnectionException, StatementExecutionException {
try {
-
RpcUtils.verifySuccessWithRedirection(client.deleteStorageGroups(sessionId,
storageGroups));
+ RpcUtils.verifySuccess(client.deleteStorageGroups(sessionId,
storageGroups));
} catch (TException e) {
if (reconnect()) {
try {
diff --git
a/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
b/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
index 47f3343c6c..a882ca6093 100644
--- a/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
+++ b/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
@@ -45,7 +45,6 @@ import java.util.Map;
import java.util.Random;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
@@ -78,89 +77,11 @@ public class SessionCacheLeaderUT {
return endpoints.get(deviceId.hashCode() % endpoints.size());
}
- @Test
- public void testSetStorageGroup() throws IoTDBConnectionException,
StatementExecutionException {
- // without leader cache
- session = new MockSession("127.0.0.1", 55560, false);
- session.open();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
- assertNull(session.deviceIdToEndpoint);
- assertNull(session.endPointToSessionConnection);
-
- session.setStorageGroup("root.sg1");
-
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
- assertNull(session.deviceIdToEndpoint);
- assertNull(session.endPointToSessionConnection);
- session.close();
-
- // with leader cache
- session = new MockSession("127.0.0.1", 55560, true);
- session.open();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
- assertEquals(0, session.deviceIdToEndpoint.size());
- assertEquals(1, session.endPointToSessionConnection.size());
-
- session.setStorageGroup("root.sg1");
-
- assertNotEquals(session.metaSessionConnection,
session.defaultSessionConnection);
- assertEquals(0, session.deviceIdToEndpoint.size());
- assertEquals(2, session.endPointToSessionConnection.size());
- session.close();
- }
-
- @Test
- public void testDeleteStorageGroups()
- throws IoTDBConnectionException, StatementExecutionException {
- // without leader cache
- session = new MockSession("127.0.0.1", 55560, false);
- session.open();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
- assertNull(session.deviceIdToEndpoint);
- assertNull(session.endPointToSessionConnection);
-
- session.deleteStorageGroups(
- new ArrayList<String>() {
- {
- add("root.sg1");
- add("root.sg2");
- add("root.sg3");
- }
- });
-
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
- assertNull(session.deviceIdToEndpoint);
- assertNull(session.endPointToSessionConnection);
- session.close();
-
- // with leader cache
- session = new MockSession("127.0.0.1", 55560, true);
- session.open();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
- assertEquals(0, session.deviceIdToEndpoint.size());
- assertEquals(1, session.endPointToSessionConnection.size());
-
- session.deleteStorageGroups(
- new ArrayList<String>() {
- {
- add("root.sg1");
- add("root.sg2");
- add("root.sg3");
- }
- });
-
- assertNotEquals(session.metaSessionConnection,
session.defaultSessionConnection);
- assertEquals(0, session.deviceIdToEndpoint.size());
- assertEquals(2, session.endPointToSessionConnection.size());
- session.close();
- }
-
@Test
public void testInsertRecord() throws IoTDBConnectionException,
StatementExecutionException {
// without leader cache
session = new MockSession("127.0.0.1", 55560, false);
session.open();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
@@ -181,7 +102,6 @@ public class SessionCacheLeaderUT {
session.insertRecord(deviceId, time, measurements, types, values);
}
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
session.close();
@@ -189,7 +109,6 @@ public class SessionCacheLeaderUT {
// with leader cache
session = new MockSession("127.0.0.1", 55560, true);
session.open();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertEquals(0, session.deviceIdToEndpoint.size());
assertEquals(1, session.endPointToSessionConnection.size());
@@ -201,7 +120,6 @@ public class SessionCacheLeaderUT {
session.insertRecord(deviceId, time, measurements, types, values);
}
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertEquals(1, session.deviceIdToEndpoint.size());
assertEquals(getDeviceIdBelongedEndpoint(deviceId),
session.deviceIdToEndpoint.get(deviceId));
assertEquals(2, session.endPointToSessionConnection.size());
@@ -214,7 +132,6 @@ public class SessionCacheLeaderUT {
// without leader cache
session = new MockSession("127.0.0.1", 55560, false);
session.open();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
@@ -231,7 +148,6 @@ public class SessionCacheLeaderUT {
session.insertRecord(deviceId, time, measurements, values);
}
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
session.close();
@@ -239,7 +155,6 @@ public class SessionCacheLeaderUT {
// with leader cache
session = new MockSession("127.0.0.1", 55560, true);
session.open();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertEquals(0, session.deviceIdToEndpoint.size());
assertEquals(1, session.endPointToSessionConnection.size());
@@ -251,7 +166,6 @@ public class SessionCacheLeaderUT {
session.insertRecord(deviceId, time, measurements, values);
}
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertEquals(1, session.deviceIdToEndpoint.size());
assertEquals(getDeviceIdBelongedEndpoint(deviceId),
session.deviceIdToEndpoint.get(deviceId));
assertEquals(2, session.endPointToSessionConnection.size());
@@ -263,7 +177,6 @@ public class SessionCacheLeaderUT {
// without leader cache
session = new MockSession("127.0.0.1", 55560, false);
session.open();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
@@ -317,7 +230,6 @@ public class SessionCacheLeaderUT {
valuesList.clear();
timestamps.clear();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
session.close();
@@ -325,7 +237,6 @@ public class SessionCacheLeaderUT {
// with leader cache
session = new MockSession("127.0.0.1", 55560, true);
session.open();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertEquals(0, session.deviceIdToEndpoint.size());
assertEquals(1, session.endPointToSessionConnection.size());
@@ -353,7 +264,6 @@ public class SessionCacheLeaderUT {
}
session.insertRecords(deviceIds, timestamps, measurementsList, typesList,
valuesList);
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertEquals(4, session.deviceIdToEndpoint.size());
for (String deviceId : allDeviceIds) {
assertEquals(getDeviceIdBelongedEndpoint(deviceId),
session.deviceIdToEndpoint.get(deviceId));
@@ -368,7 +278,6 @@ public class SessionCacheLeaderUT {
// without leader cache
session = new MockSession("127.0.0.1", 55560, false);
session.open();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
@@ -412,7 +321,6 @@ public class SessionCacheLeaderUT {
valuesList.clear();
timestamps.clear();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
session.close();
@@ -420,7 +328,6 @@ public class SessionCacheLeaderUT {
// with leader cache
session = new MockSession("127.0.0.1", 55560, true);
session.open();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertEquals(0, session.deviceIdToEndpoint.size());
assertEquals(1, session.endPointToSessionConnection.size());
@@ -443,7 +350,6 @@ public class SessionCacheLeaderUT {
}
session.insertRecords(deviceIds, timestamps, measurementsList, valuesList);
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertEquals(4, session.deviceIdToEndpoint.size());
for (String deviceId : allDeviceIds) {
assertEquals(getDeviceIdBelongedEndpoint(deviceId),
session.deviceIdToEndpoint.get(deviceId));
@@ -458,7 +364,6 @@ public class SessionCacheLeaderUT {
// without leader cache
session = new MockSession("127.0.0.1", 55560, false);
session.open();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
@@ -505,7 +410,6 @@ public class SessionCacheLeaderUT {
Boolean.TRUE);
session.insertRecordsOfOneDevice(deviceId, times, measurements, datatypes,
values);
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
session.close();
@@ -513,13 +417,11 @@ public class SessionCacheLeaderUT {
// with leader cache
session = new MockSession("127.0.0.1", 55560, true);
session.open();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertEquals(0, session.deviceIdToEndpoint.size());
assertEquals(1, session.endPointToSessionConnection.size());
session.insertRecordsOfOneDevice(deviceId, times, measurements, datatypes,
values);
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertEquals(1, session.deviceIdToEndpoint.size());
assertEquals(2, session.endPointToSessionConnection.size());
session.close();
@@ -530,7 +432,6 @@ public class SessionCacheLeaderUT {
// without leader cache
session = new MockSession("127.0.0.1", 55560, false);
session.open();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
@@ -561,7 +462,6 @@ public class SessionCacheLeaderUT {
tablet.reset();
}
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
session.close();
@@ -569,7 +469,6 @@ public class SessionCacheLeaderUT {
// with leader cache
session = new MockSession("127.0.0.1", 55560, true);
session.open();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertEquals(0, session.deviceIdToEndpoint.size());
assertEquals(1, session.endPointToSessionConnection.size());
@@ -593,7 +492,6 @@ public class SessionCacheLeaderUT {
tablet.reset();
}
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertEquals(1, session.deviceIdToEndpoint.size());
assertEquals(2, session.endPointToSessionConnection.size());
session.close();
@@ -604,7 +502,6 @@ public class SessionCacheLeaderUT {
// without leader cache
session = new MockSession("127.0.0.1", 55560, false);
session.open();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
@@ -661,7 +558,6 @@ public class SessionCacheLeaderUT {
tablet3.reset();
}
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
session.close();
@@ -669,7 +565,6 @@ public class SessionCacheLeaderUT {
// with leader cache
session = new MockSession("127.0.0.1", 55560, true);
session.open();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertEquals(0, session.deviceIdToEndpoint.size());
assertEquals(1, session.endPointToSessionConnection.size());
@@ -702,7 +597,6 @@ public class SessionCacheLeaderUT {
tablet3.reset();
}
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertEquals(3, session.deviceIdToEndpoint.size());
for (String deviceId : allDeviceIds.subList(1, allDeviceIds.size())) {
assertEquals(getDeviceIdBelongedEndpoint(deviceId),
session.deviceIdToEndpoint.get(deviceId));
@@ -720,7 +614,6 @@ public class SessionCacheLeaderUT {
} catch (IoTDBConnectionException e) {
fail(e.getMessage());
}
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
((MockSession)
session).getLastConstructedSessionConnection().setConnectionBroken(true);
@@ -765,7 +658,7 @@ public class SessionCacheLeaderUT {
session.insertRecords(deviceIds, timestamps, measurementsList,
typesList, valuesList);
} catch (IoTDBConnectionException e) {
Assert.assertEquals(
- "the session connection = EndPoint(ip:127.0.0.1, port:55560) is
broken",
+ "the session connection = TEndPoint(ip:127.0.0.1, port:55560) is
broken",
e.getMessage());
}
deviceIds.clear();
@@ -779,21 +672,20 @@ public class SessionCacheLeaderUT {
session.insertRecords(deviceIds, timestamps, measurementsList,
typesList, valuesList);
} catch (IoTDBConnectionException e) {
Assert.assertEquals(
- "the session connection = EndPoint(ip:127.0.0.1, port:55560) is
broken", e.getMessage());
+ "the session connection = TEndPoint(ip:127.0.0.1, port:55560) is
broken", e.getMessage());
}
deviceIds.clear();
measurementsList.clear();
valuesList.clear();
timestamps.clear();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
try {
session.close();
} catch (IoTDBConnectionException e) {
Assert.assertEquals(
- "the session connection = EndPoint(ip:127.0.0.1, port:55560) is
broken", e.getMessage());
+ "the session connection = TEndPoint(ip:127.0.0.1, port:55560) is
broken", e.getMessage());
}
// with leader cache
@@ -804,7 +696,6 @@ public class SessionCacheLeaderUT {
} catch (IoTDBConnectionException e) {
Assert.fail(e.getMessage());
}
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertEquals(0, session.deviceIdToEndpoint.size());
assertEquals(1, session.endPointToSessionConnection.size());
for (long time = 0; time < 500; time++) {
@@ -836,18 +727,17 @@ public class SessionCacheLeaderUT {
// set connection as broken, due to we enable the cache leader, when we
called
// ((MockSession) session).getLastConstructedSessionConnection(), the
session's endpoint has
- // been changed to EndPoint(ip:127.0.0.1, port:55562)
+ // been changed to TEndPoint(ip:127.0.0.1, port:55562)
Assert.assertEquals(
- "MockSessionConnection{ endPoint=EndPoint(ip:127.0.0.1, port:55562)}",
+ "MockSessionConnection{ endPoint=TEndPoint(ip:127.0.0.1, port:55562)}",
((MockSession)
session).getLastConstructedSessionConnection().toString());
((MockSession)
session).getLastConstructedSessionConnection().setConnectionBroken(true);
try {
session.insertRecords(deviceIds, timestamps, measurementsList,
typesList, valuesList);
} catch (IoTDBConnectionException e) {
Assert.assertEquals(
- "the session connection = EndPoint(ip:127.0.0.1, port:55562) is
broken", e.getMessage());
+ "the session connection = TEndPoint(ip:127.0.0.1, port:55562) is
broken", e.getMessage());
}
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertEquals(3, session.deviceIdToEndpoint.size());
for (Map.Entry<String, TEndPoint> endPointMap :
session.deviceIdToEndpoint.entrySet()) {
assertEquals(getDeviceIdBelongedEndpoint(endPointMap.getKey()),
endPointMap.getValue());
@@ -869,7 +759,6 @@ public class SessionCacheLeaderUT {
} catch (IoTDBConnectionException e) {
Assert.fail(e.getMessage());
}
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
@@ -917,7 +806,7 @@ public class SessionCacheLeaderUT {
session.insertTablets(tabletMap, true);
} catch (IoTDBConnectionException e) {
assertEquals(
- "the session connection = EndPoint(ip:127.0.0.1, port:55560) is
broken",
+ "the session connection = TEndPoint(ip:127.0.0.1, port:55560) is
broken",
e.getMessage());
}
tablet1.reset();
@@ -939,7 +828,6 @@ public class SessionCacheLeaderUT {
tablet3.reset();
}
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertNull(session.deviceIdToEndpoint);
assertNull(session.endPointToSessionConnection);
try {
@@ -956,7 +844,6 @@ public class SessionCacheLeaderUT {
} catch (IoTDBConnectionException e) {
Assert.fail(e.getMessage());
}
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertEquals(0, session.deviceIdToEndpoint.size());
assertEquals(1, session.endPointToSessionConnection.size());
@@ -992,7 +879,7 @@ public class SessionCacheLeaderUT {
// ((MockSession) session).getLastConstructedSessionConnection(), the
session's endpoint has
// been changed to EndPoint(ip:127.0.0.1, port:55562)
Assert.assertEquals(
- "MockSessionConnection{ endPoint=EndPoint(ip:127.0.0.1, port:55562)}",
+ "MockSessionConnection{ endPoint=TEndPoint(ip:127.0.0.1, port:55562)}",
((MockSession)
session).getLastConstructedSessionConnection().toString());
for (long row = 0; row < 10; row++) {
@@ -1024,13 +911,12 @@ public class SessionCacheLeaderUT {
session.insertTablets(tabletMap, true);
} catch (IoTDBConnectionException e) {
Assert.assertEquals(
- "the session connection = EndPoint(ip:127.0.0.1, port:55562) is
broken", e.getMessage());
+ "the session connection = TEndPoint(ip:127.0.0.1, port:55562) is
broken", e.getMessage());
}
tablet1.reset();
tablet2.reset();
tablet3.reset();
- assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertEquals(2, session.deviceIdToEndpoint.size());
for (Map.Entry<String, TEndPoint> endPointEntry :
session.deviceIdToEndpoint.entrySet()) {
assertEquals(getDeviceIdBelongedEndpoint(endPointEntry.getKey()),
endPointEntry.getValue());
@@ -1117,24 +1003,6 @@ public class SessionCacheLeaderUT {
@Override
public void close() {}
- @Override
- protected void setStorageGroup(String storageGroup)
- throws RedirectException, IoTDBConnectionException {
- if (isConnectionBroken()) {
- throw ioTDBConnectionException;
- }
- throw new RedirectException(endpoints.get(1));
- }
-
- @Override
- protected void deleteStorageGroups(List<String> storageGroups)
- throws RedirectException, IoTDBConnectionException {
- if (isConnectionBroken()) {
- throw ioTDBConnectionException;
- }
- throw new RedirectException(endpoints.get(1));
- }
-
@Override
protected void insertRecord(TSInsertRecordReq request)
throws RedirectException, IoTDBConnectionException {