This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e66320a169 [IOTDB-2988] Cache leader in session (#6083)
e66320a169 is described below

commit e66320a1697b4b652b7930b73490e83e580027a9
Author: Mrquan <[email protected]>
AuthorDate: Thu Jun 2 08:52:18 2022 +0800

    [IOTDB-2988] Cache leader in session (#6083)
---
 .../db/mpp/plan/execution/QueryExecution.java      |  48 ++++++-
 .../plan/statement/crud/InsertBaseStatement.java   |   4 +
 .../crud/InsertMultiTabletsStatement.java          |  18 +++
 .../plan/statement/crud/InsertRowStatement.java    |  12 ++
 .../crud/InsertRowsOfOneDeviceStatement.java       |   9 ++
 .../plan/statement/crud/InsertRowsStatement.java   |  17 +++
 .../plan/statement/crud/InsertTabletStatement.java |  14 ++
 .../java/org/apache/iotdb/session/Session.java     |  43 +-----
 .../apache/iotdb/session/SessionConnection.java    |   8 +-
 .../apache/iotdb/session/SessionCacheLeaderUT.java | 150 ++-------------------
 10 files changed, 131 insertions(+), 192 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 254b38f93a..06643f3c2c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -19,8 +19,10 @@
 package org.apache.iotdb.db.mpp.plan.execution;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
@@ -48,6 +50,9 @@ import 
org.apache.iotdb.db.mpp.plan.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.mpp.plan.scheduler.IScheduler;
 import org.apache.iotdb.db.mpp.plan.scheduler.StandaloneScheduler;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertBaseStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -328,13 +333,7 @@ public class QueryExecution implements IQueryExecution {
     try {
       QueryState state = future.get();
       // TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't 
FINISHED
-      TSStatusCode statusCode =
-          // For WRITE, the state should be FINISHED; For READ, the state 
could be RUNNING
-          state == QueryState.FINISHED || state == QueryState.RUNNING
-              ? TSStatusCode.SUCCESS_STATUS
-              : TSStatusCode.QUERY_PROCESS_ERROR;
-      return new ExecutionResult(
-          context.getQueryId(), RpcUtils.getStatus(statusCode, 
stateMachine.getFailureMessage()));
+      return getExecutionResult(state);
     } catch (InterruptedException | ExecutionException e) {
       // TODO: (xingtanzjr) use more accurate error handling
       if (e instanceof InterruptedException) {
@@ -360,6 +359,41 @@ public class QueryExecution implements IQueryExecution {
     }
   }
 
+  private ExecutionResult getExecutionResult(QueryState state) {
+    TSStatusCode statusCode =
+        // For WRITE, the state should be FINISHED; For READ, the state could 
be RUNNING
+        state == QueryState.FINISHED || state == QueryState.RUNNING
+            ? TSStatusCode.SUCCESS_STATUS
+            : TSStatusCode.QUERY_PROCESS_ERROR;
+
+    TSStatus tsstatus = RpcUtils.getStatus(statusCode, 
stateMachine.getFailureMessage());
+
+    // collect redirect info to client for writing
+    if (analysis.getStatement() instanceof InsertBaseStatement) {
+      InsertBaseStatement insertStatement = (InsertBaseStatement) 
analysis.getStatement();
+      List<TEndPoint> redirectNodeList =
+          insertStatement.collectRedirectInfo(analysis.getDataPartitionInfo());
+      if (insertStatement instanceof InsertRowsStatement
+          || insertStatement instanceof InsertMultiTabletsStatement) {
+        // multiple devices
+        if (statusCode == TSStatusCode.SUCCESS_STATUS) {
+          List<TSStatus> subStatus = new ArrayList<>();
+          tsstatus.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+          for (TEndPoint endPoint : redirectNodeList) {
+            subStatus.add(
+                
StatusUtils.getStatus(TSStatusCode.NEED_REDIRECTION).setRedirectNode(endPoint));
+          }
+          tsstatus.setSubStatus(subStatus);
+        }
+      } else {
+        // single device
+        tsstatus.setRedirectNode(redirectNodeList.get(0));
+      }
+    }
+
+    return new ExecutionResult(context.getQueryId(), tsstatus);
+  }
+
   public DistributedQueryPlan getDistributedPlan() {
     return distributedPlan;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
index 7d4960e109..f418b604c8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.mpp.plan.statement.crud;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -75,4 +77,6 @@ public abstract class InsertBaseStatement extends Statement {
   public List<PartialPath> getPaths() {
     return Collections.emptyList();
   }
+
+  public abstract List<TEndPoint> collectRedirectInfo(DataPartition 
dataPartition);
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
index e4bfac0e95..c3a548b27a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
@@ -19,7 +19,11 @@
 
 package org.apache.iotdb.db.mpp.plan.statement.crud;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
@@ -83,4 +87,18 @@ public class InsertMultiTabletsStatement extends 
InsertBaseStatement {
     }
     return result;
   }
+
+  @Override
+  public List<TEndPoint> collectRedirectInfo(DataPartition dataPartition) {
+    List<TEndPoint> result = new ArrayList<>();
+    for (InsertTabletStatement insertTabletStatement : 
insertTabletStatementList) {
+      TRegionReplicaSet regionReplicaSet =
+          dataPartition.getDataRegionReplicaSetForWriting(
+              insertTabletStatement.devicePath.getFullPath(),
+              StorageEngineV2.getTimePartitionSlot(
+                  
insertTabletStatement.getTimes()[insertTabletStatement.getTimes().length - 1]));
+      
result.add(regionReplicaSet.getDataNodeLocations().get(0).getExternalEndPoint());
+    }
+    return result;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
index 6259e88956..08f4f607e5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
@@ -19,7 +19,10 @@
 
 package org.apache.iotdb.db.mpp.plan.statement.crud;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -122,6 +125,15 @@ public class InsertRowStatement extends 
InsertBaseStatement {
     return 
Collections.singletonList(StorageEngineV2.getTimePartitionSlot(time));
   }
 
+  @Override
+  public List<TEndPoint> collectRedirectInfo(DataPartition dataPartition) {
+    TRegionReplicaSet regionReplicaSet =
+        dataPartition.getDataRegionReplicaSetForWriting(
+            devicePath.getFullPath(), 
StorageEngineV2.getTimePartitionSlot(time));
+    return Collections.singletonList(
+        regionReplicaSet.getDataNodeLocations().get(0).getExternalEndPoint());
+  }
+
   public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
     return visitor.visitInsertRow(this, context);
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
index dc44a46827..9dee624b36 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.db.mpp.plan.statement.crud;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
@@ -74,6 +76,13 @@ public class InsertRowsOfOneDeviceStatement extends 
InsertBaseStatement {
     return new ArrayList<>(timePartitionSlotSet);
   }
 
+  @Override
+  public List<TEndPoint> collectRedirectInfo(DataPartition dataPartition) {
+    return insertRowStatementList
+        .get(insertRowStatementList.size() - 1)
+        .collectRedirectInfo(dataPartition);
+  }
+
   public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
     return visitor.visitInsertRowsOfOneDevice(this, context);
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
index f430cba52c..3b53d90f21 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
@@ -19,7 +19,11 @@
 
 package org.apache.iotdb.db.mpp.plan.statement.crud;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
@@ -83,4 +87,17 @@ public class InsertRowsStatement extends InsertBaseStatement 
{
     }
     return result;
   }
+
+  @Override
+  public List<TEndPoint> collectRedirectInfo(DataPartition dataPartition) {
+    List<TEndPoint> result = new ArrayList<>();
+    for (InsertRowStatement insertRowStatement : insertRowStatementList) {
+      TRegionReplicaSet regionReplicaSet =
+          dataPartition.getDataRegionReplicaSetForWriting(
+              insertRowStatement.devicePath.getFullPath(),
+              
StorageEngineV2.getTimePartitionSlot(insertRowStatement.getTime()));
+      
result.add(regionReplicaSet.getDataNodeLocations().get(0).getExternalEndPoint());
+    }
+    return result;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
index 70f4f6d9e1..798c2ad328 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
@@ -18,13 +18,17 @@
  */
 package org.apache.iotdb.db.mpp.plan.statement.crud;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.tsfile.utils.BitMap;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 public class InsertTabletStatement extends InsertBaseStatement {
@@ -88,6 +92,16 @@ public class InsertTabletStatement extends 
InsertBaseStatement {
     return result;
   }
 
+  @Override
+  public List<TEndPoint> collectRedirectInfo(DataPartition dataPartition) {
+    TRegionReplicaSet regionReplicaSet =
+        dataPartition.getDataRegionReplicaSetForWriting(
+            devicePath.getFullPath(),
+            StorageEngineV2.getTimePartitionSlot(times[times.length - 1]));
+    return Collections.singletonList(
+        regionReplicaSet.getDataNodeLocations().get(0).getExternalEndPoint());
+  }
+
   public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
     return visitor.visitInsertTablet(this, context);
   }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java 
b/session/src/main/java/org/apache/iotdb/session/Session.java
index 60cf271c7f..854450f85d 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -127,7 +127,6 @@ public class Session {
 
   // Cluster version cache
   protected boolean enableCacheLeader;
-  protected SessionConnection metaSessionConnection;
   protected volatile Map<String, TEndPoint> deviceIdToEndpoint;
   protected volatile Map<TEndPoint, SessionConnection> 
endPointToSessionConnection;
 
@@ -402,7 +401,6 @@ public class Session {
     this.connectionTimeoutInMs = connectionTimeoutInMs;
     defaultSessionConnection = constructSessionConnection(this, 
defaultEndPoint, zoneId);
     defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
-    metaSessionConnection = defaultSessionConnection;
     isClosed = false;
     if (enableCacheLeader || enableQueryRedirection) {
       deviceIdToEndpoint = new ConcurrentHashMap<>();
@@ -447,29 +445,17 @@ public class Session {
 
   public void setStorageGroup(String storageGroup)
       throws IoTDBConnectionException, StatementExecutionException {
-    try {
-      metaSessionConnection.setStorageGroup(storageGroup);
-    } catch (RedirectException e) {
-      handleMetaRedirection(storageGroup, e);
-    }
+    defaultSessionConnection.setStorageGroup(storageGroup);
   }
 
   public void deleteStorageGroup(String storageGroup)
       throws IoTDBConnectionException, StatementExecutionException {
-    try {
-      
metaSessionConnection.deleteStorageGroups(Collections.singletonList(storageGroup));
-    } catch (RedirectException e) {
-      handleMetaRedirection(storageGroup, e);
-    }
+    
defaultSessionConnection.deleteStorageGroups(Collections.singletonList(storageGroup));
   }
 
   public void deleteStorageGroups(List<String> storageGroups)
       throws IoTDBConnectionException, StatementExecutionException {
-    try {
-      metaSessionConnection.deleteStorageGroups(storageGroups);
-    } catch (RedirectException e) {
-      handleMetaRedirection(storageGroups.toString(), e);
-    }
+    defaultSessionConnection.deleteStorageGroups(storageGroups);
   }
 
   public void createTimeseries(
@@ -874,29 +860,6 @@ public class Session {
     }
   }
 
-  private void handleMetaRedirection(String storageGroup, RedirectException e)
-      throws IoTDBConnectionException {
-    if (enableCacheLeader) {
-      logger.debug("storageGroup[{}]:{}", storageGroup, e.getMessage());
-      AtomicReference<IoTDBConnectionException> exceptionReference = new 
AtomicReference<>();
-      SessionConnection connection =
-          endPointToSessionConnection.computeIfAbsent(
-              e.getEndPoint(),
-              k -> {
-                try {
-                  return constructSessionConnection(this, e.getEndPoint(), 
zoneId);
-                } catch (IoTDBConnectionException ex) {
-                  exceptionReference.set(ex);
-                  return null;
-                }
-              });
-      if (connection == null) {
-        throw new IoTDBConnectionException(exceptionReference.get());
-      }
-      metaSessionConnection = connection;
-    }
-  }
-
   private void handleRedirection(String deviceId, TEndPoint endpoint)
       throws IoTDBConnectionException {
     if (enableCacheLeader) {
diff --git 
a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java 
b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 76c3f3ce7c..0f0741ec21 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -222,9 +222,9 @@ public class SessionConnection {
   }
 
   protected void setStorageGroup(String storageGroup)
-      throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
+      throws IoTDBConnectionException, StatementExecutionException {
     try {
-      RpcUtils.verifySuccessWithRedirection(client.setStorageGroup(sessionId, 
storageGroup));
+      RpcUtils.verifySuccess(client.setStorageGroup(sessionId, storageGroup));
     } catch (TException e) {
       if (reconnect()) {
         try {
@@ -239,9 +239,9 @@ public class SessionConnection {
   }
 
   protected void deleteStorageGroups(List<String> storageGroups)
-      throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
+      throws IoTDBConnectionException, StatementExecutionException {
     try {
-      
RpcUtils.verifySuccessWithRedirection(client.deleteStorageGroups(sessionId, 
storageGroups));
+      RpcUtils.verifySuccess(client.deleteStorageGroups(sessionId, 
storageGroups));
     } catch (TException e) {
       if (reconnect()) {
         try {
diff --git 
a/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java 
b/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
index 47f3343c6c..a882ca6093 100644
--- a/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
+++ b/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
@@ -45,7 +45,6 @@ import java.util.Map;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
@@ -78,89 +77,11 @@ public class SessionCacheLeaderUT {
     return endpoints.get(deviceId.hashCode() % endpoints.size());
   }
 
-  @Test
-  public void testSetStorageGroup() throws IoTDBConnectionException, 
StatementExecutionException {
-    // without leader cache
-    session = new MockSession("127.0.0.1", 55560, false);
-    session.open();
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
-    assertNull(session.deviceIdToEndpoint);
-    assertNull(session.endPointToSessionConnection);
-
-    session.setStorageGroup("root.sg1");
-
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
-    assertNull(session.deviceIdToEndpoint);
-    assertNull(session.endPointToSessionConnection);
-    session.close();
-
-    // with leader cache
-    session = new MockSession("127.0.0.1", 55560, true);
-    session.open();
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
-    assertEquals(0, session.deviceIdToEndpoint.size());
-    assertEquals(1, session.endPointToSessionConnection.size());
-
-    session.setStorageGroup("root.sg1");
-
-    assertNotEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
-    assertEquals(0, session.deviceIdToEndpoint.size());
-    assertEquals(2, session.endPointToSessionConnection.size());
-    session.close();
-  }
-
-  @Test
-  public void testDeleteStorageGroups()
-      throws IoTDBConnectionException, StatementExecutionException {
-    // without leader cache
-    session = new MockSession("127.0.0.1", 55560, false);
-    session.open();
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
-    assertNull(session.deviceIdToEndpoint);
-    assertNull(session.endPointToSessionConnection);
-
-    session.deleteStorageGroups(
-        new ArrayList<String>() {
-          {
-            add("root.sg1");
-            add("root.sg2");
-            add("root.sg3");
-          }
-        });
-
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
-    assertNull(session.deviceIdToEndpoint);
-    assertNull(session.endPointToSessionConnection);
-    session.close();
-
-    // with leader cache
-    session = new MockSession("127.0.0.1", 55560, true);
-    session.open();
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
-    assertEquals(0, session.deviceIdToEndpoint.size());
-    assertEquals(1, session.endPointToSessionConnection.size());
-
-    session.deleteStorageGroups(
-        new ArrayList<String>() {
-          {
-            add("root.sg1");
-            add("root.sg2");
-            add("root.sg3");
-          }
-        });
-
-    assertNotEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
-    assertEquals(0, session.deviceIdToEndpoint.size());
-    assertEquals(2, session.endPointToSessionConnection.size());
-    session.close();
-  }
-
   @Test
   public void testInsertRecord() throws IoTDBConnectionException, 
StatementExecutionException {
     // without leader cache
     session = new MockSession("127.0.0.1", 55560, false);
     session.open();
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertNull(session.deviceIdToEndpoint);
     assertNull(session.endPointToSessionConnection);
 
@@ -181,7 +102,6 @@ public class SessionCacheLeaderUT {
       session.insertRecord(deviceId, time, measurements, types, values);
     }
 
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertNull(session.deviceIdToEndpoint);
     assertNull(session.endPointToSessionConnection);
     session.close();
@@ -189,7 +109,6 @@ public class SessionCacheLeaderUT {
     // with leader cache
     session = new MockSession("127.0.0.1", 55560, true);
     session.open();
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertEquals(0, session.deviceIdToEndpoint.size());
     assertEquals(1, session.endPointToSessionConnection.size());
 
@@ -201,7 +120,6 @@ public class SessionCacheLeaderUT {
       session.insertRecord(deviceId, time, measurements, types, values);
     }
 
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertEquals(1, session.deviceIdToEndpoint.size());
     assertEquals(getDeviceIdBelongedEndpoint(deviceId), 
session.deviceIdToEndpoint.get(deviceId));
     assertEquals(2, session.endPointToSessionConnection.size());
@@ -214,7 +132,6 @@ public class SessionCacheLeaderUT {
     // without leader cache
     session = new MockSession("127.0.0.1", 55560, false);
     session.open();
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertNull(session.deviceIdToEndpoint);
     assertNull(session.endPointToSessionConnection);
 
@@ -231,7 +148,6 @@ public class SessionCacheLeaderUT {
       session.insertRecord(deviceId, time, measurements, values);
     }
 
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertNull(session.deviceIdToEndpoint);
     assertNull(session.endPointToSessionConnection);
     session.close();
@@ -239,7 +155,6 @@ public class SessionCacheLeaderUT {
     // with leader cache
     session = new MockSession("127.0.0.1", 55560, true);
     session.open();
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertEquals(0, session.deviceIdToEndpoint.size());
     assertEquals(1, session.endPointToSessionConnection.size());
 
@@ -251,7 +166,6 @@ public class SessionCacheLeaderUT {
       session.insertRecord(deviceId, time, measurements, values);
     }
 
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertEquals(1, session.deviceIdToEndpoint.size());
     assertEquals(getDeviceIdBelongedEndpoint(deviceId), 
session.deviceIdToEndpoint.get(deviceId));
     assertEquals(2, session.endPointToSessionConnection.size());
@@ -263,7 +177,6 @@ public class SessionCacheLeaderUT {
     // without leader cache
     session = new MockSession("127.0.0.1", 55560, false);
     session.open();
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertNull(session.deviceIdToEndpoint);
     assertNull(session.endPointToSessionConnection);
 
@@ -317,7 +230,6 @@ public class SessionCacheLeaderUT {
     valuesList.clear();
     timestamps.clear();
 
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertNull(session.deviceIdToEndpoint);
     assertNull(session.endPointToSessionConnection);
     session.close();
@@ -325,7 +237,6 @@ public class SessionCacheLeaderUT {
     // with leader cache
     session = new MockSession("127.0.0.1", 55560, true);
     session.open();
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertEquals(0, session.deviceIdToEndpoint.size());
     assertEquals(1, session.endPointToSessionConnection.size());
 
@@ -353,7 +264,6 @@ public class SessionCacheLeaderUT {
     }
     session.insertRecords(deviceIds, timestamps, measurementsList, typesList, 
valuesList);
 
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertEquals(4, session.deviceIdToEndpoint.size());
     for (String deviceId : allDeviceIds) {
       assertEquals(getDeviceIdBelongedEndpoint(deviceId), 
session.deviceIdToEndpoint.get(deviceId));
@@ -368,7 +278,6 @@ public class SessionCacheLeaderUT {
     // without leader cache
     session = new MockSession("127.0.0.1", 55560, false);
     session.open();
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertNull(session.deviceIdToEndpoint);
     assertNull(session.endPointToSessionConnection);
 
@@ -412,7 +321,6 @@ public class SessionCacheLeaderUT {
     valuesList.clear();
     timestamps.clear();
 
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertNull(session.deviceIdToEndpoint);
     assertNull(session.endPointToSessionConnection);
     session.close();
@@ -420,7 +328,6 @@ public class SessionCacheLeaderUT {
     // with leader cache
     session = new MockSession("127.0.0.1", 55560, true);
     session.open();
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertEquals(0, session.deviceIdToEndpoint.size());
     assertEquals(1, session.endPointToSessionConnection.size());
 
@@ -443,7 +350,6 @@ public class SessionCacheLeaderUT {
     }
     session.insertRecords(deviceIds, timestamps, measurementsList, valuesList);
 
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertEquals(4, session.deviceIdToEndpoint.size());
     for (String deviceId : allDeviceIds) {
       assertEquals(getDeviceIdBelongedEndpoint(deviceId), 
session.deviceIdToEndpoint.get(deviceId));
@@ -458,7 +364,6 @@ public class SessionCacheLeaderUT {
     // without leader cache
     session = new MockSession("127.0.0.1", 55560, false);
     session.open();
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertNull(session.deviceIdToEndpoint);
     assertNull(session.endPointToSessionConnection);
 
@@ -505,7 +410,6 @@ public class SessionCacheLeaderUT {
         Boolean.TRUE);
     session.insertRecordsOfOneDevice(deviceId, times, measurements, datatypes, 
values);
 
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertNull(session.deviceIdToEndpoint);
     assertNull(session.endPointToSessionConnection);
     session.close();
@@ -513,13 +417,11 @@ public class SessionCacheLeaderUT {
     // with leader cache
     session = new MockSession("127.0.0.1", 55560, true);
     session.open();
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertEquals(0, session.deviceIdToEndpoint.size());
     assertEquals(1, session.endPointToSessionConnection.size());
 
     session.insertRecordsOfOneDevice(deviceId, times, measurements, datatypes, 
values);
 
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertEquals(1, session.deviceIdToEndpoint.size());
     assertEquals(2, session.endPointToSessionConnection.size());
     session.close();
@@ -530,7 +432,6 @@ public class SessionCacheLeaderUT {
     // without leader cache
     session = new MockSession("127.0.0.1", 55560, false);
     session.open();
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertNull(session.deviceIdToEndpoint);
     assertNull(session.endPointToSessionConnection);
 
@@ -561,7 +462,6 @@ public class SessionCacheLeaderUT {
       tablet.reset();
     }
 
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertNull(session.deviceIdToEndpoint);
     assertNull(session.endPointToSessionConnection);
     session.close();
@@ -569,7 +469,6 @@ public class SessionCacheLeaderUT {
     // with leader cache
     session = new MockSession("127.0.0.1", 55560, true);
     session.open();
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertEquals(0, session.deviceIdToEndpoint.size());
     assertEquals(1, session.endPointToSessionConnection.size());
 
@@ -593,7 +492,6 @@ public class SessionCacheLeaderUT {
       tablet.reset();
     }
 
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertEquals(1, session.deviceIdToEndpoint.size());
     assertEquals(2, session.endPointToSessionConnection.size());
     session.close();
@@ -604,7 +502,6 @@ public class SessionCacheLeaderUT {
     // without leader cache
     session = new MockSession("127.0.0.1", 55560, false);
     session.open();
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertNull(session.deviceIdToEndpoint);
     assertNull(session.endPointToSessionConnection);
 
@@ -661,7 +558,6 @@ public class SessionCacheLeaderUT {
       tablet3.reset();
     }
 
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertNull(session.deviceIdToEndpoint);
     assertNull(session.endPointToSessionConnection);
     session.close();
@@ -669,7 +565,6 @@ public class SessionCacheLeaderUT {
     // with leader cache
     session = new MockSession("127.0.0.1", 55560, true);
     session.open();
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertEquals(0, session.deviceIdToEndpoint.size());
     assertEquals(1, session.endPointToSessionConnection.size());
 
@@ -702,7 +597,6 @@ public class SessionCacheLeaderUT {
       tablet3.reset();
     }
 
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertEquals(3, session.deviceIdToEndpoint.size());
     for (String deviceId : allDeviceIds.subList(1, allDeviceIds.size())) {
       assertEquals(getDeviceIdBelongedEndpoint(deviceId), 
session.deviceIdToEndpoint.get(deviceId));
@@ -720,7 +614,6 @@ public class SessionCacheLeaderUT {
     } catch (IoTDBConnectionException e) {
       fail(e.getMessage());
     }
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertNull(session.deviceIdToEndpoint);
     assertNull(session.endPointToSessionConnection);
     ((MockSession) 
session).getLastConstructedSessionConnection().setConnectionBroken(true);
@@ -765,7 +658,7 @@ public class SessionCacheLeaderUT {
           session.insertRecords(deviceIds, timestamps, measurementsList, 
typesList, valuesList);
         } catch (IoTDBConnectionException e) {
           Assert.assertEquals(
-              "the session connection = EndPoint(ip:127.0.0.1, port:55560) is 
broken",
+              "the session connection = TEndPoint(ip:127.0.0.1, port:55560) is 
broken",
               e.getMessage());
         }
         deviceIds.clear();
@@ -779,21 +672,20 @@ public class SessionCacheLeaderUT {
       session.insertRecords(deviceIds, timestamps, measurementsList, 
typesList, valuesList);
     } catch (IoTDBConnectionException e) {
       Assert.assertEquals(
-          "the session connection = EndPoint(ip:127.0.0.1, port:55560) is 
broken", e.getMessage());
+          "the session connection = TEndPoint(ip:127.0.0.1, port:55560) is 
broken", e.getMessage());
     }
     deviceIds.clear();
     measurementsList.clear();
     valuesList.clear();
     timestamps.clear();
 
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertNull(session.deviceIdToEndpoint);
     assertNull(session.endPointToSessionConnection);
     try {
       session.close();
     } catch (IoTDBConnectionException e) {
       Assert.assertEquals(
-          "the session connection = EndPoint(ip:127.0.0.1, port:55560) is 
broken", e.getMessage());
+          "the session connection = TEndPoint(ip:127.0.0.1, port:55560) is 
broken", e.getMessage());
     }
 
     // with leader cache
@@ -804,7 +696,6 @@ public class SessionCacheLeaderUT {
     } catch (IoTDBConnectionException e) {
       Assert.fail(e.getMessage());
     }
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertEquals(0, session.deviceIdToEndpoint.size());
     assertEquals(1, session.endPointToSessionConnection.size());
     for (long time = 0; time < 500; time++) {
@@ -836,18 +727,17 @@ public class SessionCacheLeaderUT {
 
     // set connection as broken, due to we enable the cache leader, when we 
called
     // ((MockSession) session).getLastConstructedSessionConnection(), the 
session's endpoint has
-    // been changed to EndPoint(ip:127.0.0.1, port:55562)
+    // been changed to TEndPoint(ip:127.0.0.1, port:55562)
     Assert.assertEquals(
-        "MockSessionConnection{ endPoint=EndPoint(ip:127.0.0.1, port:55562)}",
+        "MockSessionConnection{ endPoint=TEndPoint(ip:127.0.0.1, port:55562)}",
         ((MockSession) 
session).getLastConstructedSessionConnection().toString());
     ((MockSession) 
session).getLastConstructedSessionConnection().setConnectionBroken(true);
     try {
       session.insertRecords(deviceIds, timestamps, measurementsList, 
typesList, valuesList);
     } catch (IoTDBConnectionException e) {
       Assert.assertEquals(
-          "the session connection = EndPoint(ip:127.0.0.1, port:55562) is 
broken", e.getMessage());
+          "the session connection = TEndPoint(ip:127.0.0.1, port:55562) is 
broken", e.getMessage());
     }
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertEquals(3, session.deviceIdToEndpoint.size());
     for (Map.Entry<String, TEndPoint> endPointMap : 
session.deviceIdToEndpoint.entrySet()) {
       assertEquals(getDeviceIdBelongedEndpoint(endPointMap.getKey()), 
endPointMap.getValue());
@@ -869,7 +759,6 @@ public class SessionCacheLeaderUT {
     } catch (IoTDBConnectionException e) {
       Assert.fail(e.getMessage());
     }
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertNull(session.deviceIdToEndpoint);
     assertNull(session.endPointToSessionConnection);
 
@@ -917,7 +806,7 @@ public class SessionCacheLeaderUT {
           session.insertTablets(tabletMap, true);
         } catch (IoTDBConnectionException e) {
           assertEquals(
-              "the session connection = EndPoint(ip:127.0.0.1, port:55560) is 
broken",
+              "the session connection = TEndPoint(ip:127.0.0.1, port:55560) is 
broken",
               e.getMessage());
         }
         tablet1.reset();
@@ -939,7 +828,6 @@ public class SessionCacheLeaderUT {
       tablet3.reset();
     }
 
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertNull(session.deviceIdToEndpoint);
     assertNull(session.endPointToSessionConnection);
     try {
@@ -956,7 +844,6 @@ public class SessionCacheLeaderUT {
     } catch (IoTDBConnectionException e) {
       Assert.fail(e.getMessage());
     }
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertEquals(0, session.deviceIdToEndpoint.size());
     assertEquals(1, session.endPointToSessionConnection.size());
 
@@ -992,7 +879,7 @@ public class SessionCacheLeaderUT {
     // ((MockSession) session).getLastConstructedSessionConnection(), the 
session's endpoint has
     // been changed to EndPoint(ip:127.0.0.1, port:55562)
     Assert.assertEquals(
-        "MockSessionConnection{ endPoint=EndPoint(ip:127.0.0.1, port:55562)}",
+        "MockSessionConnection{ endPoint=TEndPoint(ip:127.0.0.1, port:55562)}",
         ((MockSession) 
session).getLastConstructedSessionConnection().toString());
 
     for (long row = 0; row < 10; row++) {
@@ -1024,13 +911,12 @@ public class SessionCacheLeaderUT {
       session.insertTablets(tabletMap, true);
     } catch (IoTDBConnectionException e) {
       Assert.assertEquals(
-          "the session connection = EndPoint(ip:127.0.0.1, port:55562) is 
broken", e.getMessage());
+          "the session connection = TEndPoint(ip:127.0.0.1, port:55562) is 
broken", e.getMessage());
     }
     tablet1.reset();
     tablet2.reset();
     tablet3.reset();
 
-    assertEquals(session.metaSessionConnection, 
session.defaultSessionConnection);
     assertEquals(2, session.deviceIdToEndpoint.size());
     for (Map.Entry<String, TEndPoint> endPointEntry : 
session.deviceIdToEndpoint.entrySet()) {
       assertEquals(getDeviceIdBelongedEndpoint(endPointEntry.getKey()), 
endPointEntry.getValue());
@@ -1117,24 +1003,6 @@ public class SessionCacheLeaderUT {
     @Override
     public void close() {}
 
-    @Override
-    protected void setStorageGroup(String storageGroup)
-        throws RedirectException, IoTDBConnectionException {
-      if (isConnectionBroken()) {
-        throw ioTDBConnectionException;
-      }
-      throw new RedirectException(endpoints.get(1));
-    }
-
-    @Override
-    protected void deleteStorageGroups(List<String> storageGroups)
-        throws RedirectException, IoTDBConnectionException {
-      if (isConnectionBroken()) {
-        throw ioTDBConnectionException;
-      }
-      throw new RedirectException(endpoints.get(1));
-    }
-
     @Override
     protected void insertRecord(TSInsertRecordReq request)
         throws RedirectException, IoTDBConnectionException {

Reply via email to