This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/align-id-2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/align-id-2 by this
push:
new f565c51 change to TEndpoint
f565c51 is described below
commit f565c51ef6e6598e8f5dcd11cb91dc54ca2d84fd
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Fri Apr 1 20:57:35 2022 +0800
change to TEndpoint
---
.../iotdb/cluster/coordinator/Coordinator.java | 6 +-
.../cluster/server/member/DataGroupMember.java | 4 +-
.../cluster/server/member/MetaGroupMember.java | 6 +-
.../cluster/server/service/BaseAsyncService.java | 4 +-
.../apache/iotdb/cluster/utils/StatusUtils.java | 6 +-
.../iotdb/confignode/conf/ConfigNodeConf.java | 2 +-
.../confignode/conf/ConfigNodeDescriptor.java | 2 +-
.../iotdb/confignode/manager/ConfigManager.java | 2 +-
.../iotdb/confignode/partition/DataNodeInfo.java | 2 +-
.../physical/sys/RegisterDataNodePlan.java | 2 +-
.../server/ConfigNodeRPCServerProcessor.java | 6 +-
.../confignode/consensus/RatisConsensusDemo.java | 4 +-
.../manager/ConfigManagerManualTest.java | 6 +-
.../server/ConfigNodeRPCServerProcessorTest.java | 14 +--
.../org/apache/iotdb/consensus/common/Peer.java | 1 +
.../iotdb/consensus/ratis/RatisConsensus.java | 11 ++-
.../org/apache/iotdb/consensus/ratis/Utils.java | 4 +-
.../iotdb/consensus/ratis/RatisConsensusTest.java | 2 +-
.../standalone/StandAloneConsensusTest.java | 2 +-
.../iotdb/commons/partition/DataPartitionInfo.java | 8 +-
.../apache/iotdb/commons/partition}/Endpoint.java | 2 +-
...RegionReplicaSet.java => RegionReplicaSet.java} | 20 ++--
.../commons/partition/SchemaRegionReplicaSet.java | 8 +-
.../apache/iotdb/commons/utils/CommonUtils.java | 10 +-
.../iotdb/db/consensus/ConsensusExample.java | 2 +-
.../apache/iotdb/db/mpp/sql/analyze/Analysis.java | 4 +-
.../mpp/sql/analyze/FakePartitionFetcherImpl.java | 49 +++++-----
.../db/mpp/sql/planner/DistributionPlanner.java | 16 ++--
.../db/mpp/sql/planner/plan/FragmentInstance.java | 16 ++--
.../db/mpp/sql/planner/plan/PlanFragment.java | 8 +-
.../plan/SimpleFragmentParallelPlanner.java | 4 +-
.../planner/plan/node/process/ExchangeNode.java | 8 +-
.../planner/plan/node/sink/FragmentSinkNode.java | 8 +-
.../plan/node/source/SeriesAggregateScanNode.java | 12 +--
.../planner/plan/node/source/SeriesScanNode.java | 19 ++--
.../sql/planner/plan/node/source/SourceNode.java | 6 +-
.../db/query/dataset/AlignByDeviceDataSet.java | 4 +-
.../java/org/apache/iotdb/db/service/DataNode.java | 10 +-
.../db/service/thrift/impl/TSServiceImpl.java | 6 +-
.../db/mpp/sql/plan/DistributionPlannerTest.java | 55 +++++------
.../org/apache/iotdb/rpc/RedirectException.java | 30 +++---
.../main/java/org/apache/iotdb/rpc/RpcUtils.java | 8 +-
.../java/org/apache/iotdb/session/Session.java | 104 ++++++++++-----------
.../apache/iotdb/session/SessionConnection.java | 20 ++--
.../apache/iotdb/session/util/SessionUtils.java | 12 +--
.../apache/iotdb/session/SessionCacheLeaderUT.java | 52 +++++------
.../src/main/thrift/confignode.thrift | 6 +-
thrift/src/main/thrift/rpc.thrift | 4 +-
48 files changed, 298 insertions(+), 299 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index b779cb0..2bacffb 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -60,7 +60,7 @@ import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.TEndpoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.slf4j.Logger;
@@ -512,7 +512,7 @@ public class Coordinator {
List<String> errorCodePartitionGroups = new ArrayList<>();
TSStatus tmpStatus;
boolean allRedirect = true;
- EndPoint endPoint = null;
+ TEndpoint endPoint = null;
for (Map.Entry<PhysicalPlan, PartitionGroup> entry :
planGroupMap.entrySet()) {
tmpStatus = forwardToSingleGroup(entry);
if (tmpStatus.isSetRedirectNode()) {
@@ -767,7 +767,7 @@ public class Coordinator {
}
if (!StatusUtils.TIME_OUT.equals(status)) {
if (!status.isSetRedirectNode()) {
- status.setRedirectNode(new EndPoint(node.getClientIp(),
node.getClientPort()));
+ status.setRedirectNode(new TEndpoint(node.getClientIp(),
node.getClientPort()));
}
return status;
} else {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 66d8143..b528cde 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -93,7 +93,7 @@ import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
import org.apache.iotdb.db.qp.physical.sys.LogPlan;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.TEndpoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -837,7 +837,7 @@ public class DataGroupMember extends RaftMember implements
DataGroupMemberMBean
Timer.Statistic.DATA_GROUP_MEMBER_FORWARD_PLAN.calOperationCostTimeFromStart(startTime);
if (!StatusUtils.NO_LEADER.equals(result)) {
result.setRedirectNode(
- new EndPoint(leader.get().getClientIp(),
leader.get().getClientPort()));
+ new TEndpoint(leader.get().getClientIp(),
leader.get().getClientPort()));
return result;
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index a241192..fe9fab5 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -87,7 +87,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.db.utils.TimeValuePairUtils.Intervals;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.TEndpoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -1382,7 +1382,7 @@ public class MetaGroupMember extends RaftMember
implements IService, MetaGroupMe
if (!StatusUtils.NO_LEADER.equals(result)) {
result =
StatusUtils.getStatus(
- result, new EndPoint(leader.get().getInternalIp(),
leader.get().getClientPort()));
+ result, new TEndpoint(leader.get().getInternalIp(),
leader.get().getClientPort()));
return result;
}
}
@@ -1398,7 +1398,7 @@ public class MetaGroupMember extends RaftMember
implements IService, MetaGroupMe
TSStatus result = forwardPlan(plan, leader.get(), null);
if (!StatusUtils.NO_LEADER.equals(result)) {
result.setRedirectNode(
- new EndPoint(leader.get().getClientIp(),
leader.get().getClientPort()));
+ new TEndpoint(leader.get().getClientIp(),
leader.get().getClientPort()));
}
return result;
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index f7c69c4..4d73ce8 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -35,7 +35,7 @@ import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.utils.IOUtils;
import org.apache.iotdb.cluster.utils.StatusUtils;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.TEndpoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.thrift.TException;
@@ -167,7 +167,7 @@ public abstract class BaseAsyncService implements
RaftService.AsyncIface {
resultHandler.onComplete(
StatusUtils.getStatus(
status,
- new EndPoint(
+ new TEndpoint(
member.getThisNode().getClientIp(),
member.getThisNode().getClientPort())));
} catch (Exception e) {
resultHandler.onError(e);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
index 93a90d6..20cc2cb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.cluster.utils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.TEndpoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
public class StatusUtils {
@@ -208,7 +208,7 @@ public class StatusUtils {
return status;
}
- public static TSStatus getStatus(TSStatusCode statusCode, EndPoint
redirectedNode) {
+ public static TSStatus getStatus(TSStatusCode statusCode, TEndpoint
redirectedNode) {
TSStatus status = getStatus(statusCode);
status.setRedirectNode(redirectedNode);
return status;
@@ -220,7 +220,7 @@ public class StatusUtils {
return newStatus;
}
- public static TSStatus getStatus(TSStatus status, EndPoint redirectedNode) {
+ public static TSStatus getStatus(TSStatus status, TEndpoint redirectedNode) {
TSStatus newStatus = status.deepCopy();
newStatus.setRedirectNode(redirectedNode);
return newStatus;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index 10b543b..33815b8 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.confignode.conf;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.partition.Endpoint;
import org.apache.iotdb.consensus.common.ConsensusType;
-import org.apache.iotdb.consensus.common.Endpoint;
import org.apache.iotdb.rpc.RpcUtils;
import java.io.File;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 0f5f2c8..138974e 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -18,8 +18,8 @@
*/
package org.apache.iotdb.confignode.conf;
+import org.apache.iotdb.commons.partition.Endpoint;
import org.apache.iotdb.consensus.common.ConsensusType;
-import org.apache.iotdb.consensus.common.Endpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index e7cab5b..7fb8485 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -20,13 +20,13 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.commons.hash.DeviceGroupHashExecutor;
import org.apache.iotdb.commons.partition.ConsensusGroupId;
+import org.apache.iotdb.commons.partition.Endpoint;
import org.apache.iotdb.commons.partition.GroupType;
import org.apache.iotdb.confignode.conf.ConfigNodeConf;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import
org.apache.iotdb.confignode.consensus.statemachine.PartitionRegionStateMachine;
import org.apache.iotdb.confignode.physical.PhysicalPlan;
import org.apache.iotdb.consensus.IConsensus;
-import org.apache.iotdb.consensus.common.Endpoint;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataNodeInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataNodeInfo.java
index 52134c6..136fa23 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataNodeInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataNodeInfo.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.confignode.partition;
-import org.apache.iotdb.consensus.common.Endpoint;
+import org.apache.iotdb.commons.partition.Endpoint;
import java.util.ArrayList;
import java.util.List;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/RegisterDataNodePlan.java
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/RegisterDataNodePlan.java
index 6bf97df..4d47af0 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/RegisterDataNodePlan.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/RegisterDataNodePlan.java
@@ -18,10 +18,10 @@
*/
package org.apache.iotdb.confignode.physical.sys;
+import org.apache.iotdb.commons.partition.Endpoint;
import org.apache.iotdb.confignode.partition.DataNodeInfo;
import org.apache.iotdb.confignode.physical.PhysicalPlan;
import org.apache.iotdb.confignode.physical.PhysicalPlanType;
-import org.apache.iotdb.consensus.common.Endpoint;
import java.nio.ByteBuffer;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
index 9619301..141e76d 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.confignode.service.thrift.server;
+import org.apache.iotdb.commons.partition.Endpoint;
import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
import
org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
import org.apache.iotdb.confignode.manager.ConfigManager;
@@ -45,11 +46,10 @@ import
org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfo;
import org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.SetStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.StorageGroupMessage;
-import org.apache.iotdb.consensus.common.Endpoint;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.TEndpoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.thrift.TException;
@@ -106,7 +106,7 @@ public class ConfigNodeRPCServerProcessor implements
ConfigIService.Iface {
info.getDataNodeID(),
new DataNodeMessage(
info.getDataNodeID(),
- new EndPoint(info.getEndPoint().getIp(),
info.getEndPoint().getPort())));
+ new TEndpoint(info.getEndPoint().getIp(),
info.getEndPoint().getPort())));
}
return result;
}
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
index 6807cc6..8574e4a 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
@@ -26,7 +26,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.SetStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.StorageGroupMessage;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.TEndpoint;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -84,7 +84,7 @@ public class RatisConsensusDemo {
private void registerDataNodes() throws TException, InterruptedException {
// DataNodes can connect to any ConfigNode and send write requests
for (int i = 0; i < 10; i++) {
- EndPoint endPoint = new EndPoint("0.0.0.0", 6667 + i);
+ TEndpoint endPoint = new TEndpoint("0.0.0.0", 6667 + i);
DataNodeRegisterReq req = new DataNodeRegisterReq(endPoint);
DataNodeRegisterResp resp = clients[0].registerDataNode(req);
Assert.assertEquals(
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ConfigManagerManualTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/ConfigManagerManualTest.java
index 270ba1c..2784f9d 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ConfigManagerManualTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/ConfigManagerManualTest.java
@@ -24,7 +24,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterResp;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.TEndpoint;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -74,7 +74,7 @@ public class ConfigManagerManualTest {
private void registerDataNodes() throws TException {
for (int i = 0; i < 3; i++) {
- DataNodeRegisterReq req = new DataNodeRegisterReq(new
EndPoint("0.0.0.0", 6667 + i));
+ DataNodeRegisterReq req = new DataNodeRegisterReq(new
TEndpoint("0.0.0.0", 6667 + i));
DataNodeRegisterResp resp = clients[0].registerDataNode(req);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
resp.registerResult.getCode());
@@ -114,7 +114,7 @@ public class ConfigManagerManualTest {
}
DataNodeRegisterResp resp =
- clients[1].registerDataNode(new DataNodeRegisterReq(new
EndPoint("0.0.0.0", 6670)));
+ clients[1].registerDataNode(new DataNodeRegisterReq(new
TEndpoint("0.0.0.0", 6670)));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
resp.registerResult.getCode());
Assert.assertEquals(3, resp.getDataNodeID());
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
index 23fd30d..2c148e2 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
@@ -24,7 +24,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.SetStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.StorageGroupMessage;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.TEndpoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.thrift.TException;
@@ -44,9 +44,9 @@ public class ConfigNodeRPCServerProcessorTest {
ConfigNodeRPCServerProcessor processor = new
ConfigNodeRPCServerProcessor();
DataNodeRegisterResp resp;
- DataNodeRegisterReq registerReq0 = new DataNodeRegisterReq(new
EndPoint("0.0.0.0", 6667));
- DataNodeRegisterReq registerReq1 = new DataNodeRegisterReq(new
EndPoint("0.0.0.0", 6668));
- DataNodeRegisterReq registerReq2 = new DataNodeRegisterReq(new
EndPoint("0.0.0.0", 6669));
+ DataNodeRegisterReq registerReq0 = new DataNodeRegisterReq(new
TEndpoint("0.0.0.0", 6667));
+ DataNodeRegisterReq registerReq1 = new DataNodeRegisterReq(new
TEndpoint("0.0.0.0", 6668));
+ DataNodeRegisterReq registerReq2 = new DataNodeRegisterReq(new
TEndpoint("0.0.0.0", 6669));
// test success register
resp = processor.registerDataNode(registerReq0);
@@ -94,9 +94,9 @@ public class ConfigNodeRPCServerProcessorTest {
Assert.assertEquals("DataNode is not enough, please register more.",
status.getMessage());
// register DataNodes
- DataNodeRegisterReq registerReq0 = new DataNodeRegisterReq(new
EndPoint("0.0.0.0", 6667));
- DataNodeRegisterReq registerReq1 = new DataNodeRegisterReq(new
EndPoint("0.0.0.0", 6668));
- DataNodeRegisterReq registerReq2 = new DataNodeRegisterReq(new
EndPoint("0.0.0.0", 6669));
+ DataNodeRegisterReq registerReq0 = new DataNodeRegisterReq(new
TEndpoint("0.0.0.0", 6667));
+ DataNodeRegisterReq registerReq1 = new DataNodeRegisterReq(new
TEndpoint("0.0.0.0", 6668));
+ DataNodeRegisterReq registerReq2 = new DataNodeRegisterReq(new
TEndpoint("0.0.0.0", 6669));
status = processor.registerDataNode(registerReq0).getRegisterResult();
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
status = processor.registerDataNode(registerReq1).getRegisterResult();
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
b/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
index 02be227..59eacba 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.consensus.common;
import org.apache.iotdb.commons.partition.ConsensusGroupId;
+import org.apache.iotdb.commons.partition.Endpoint;
import java.util.Objects;
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index d7be0ce..9278ab9 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -20,9 +20,9 @@
package org.apache.iotdb.consensus.ratis;
import org.apache.iotdb.commons.partition.ConsensusGroupId;
+import org.apache.iotdb.commons.partition.Endpoint;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.Endpoint;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
@@ -34,7 +34,7 @@ import
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException
import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
import org.apache.iotdb.consensus.statemachine.IStateMachine;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.TEndpoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.ratis.client.RaftClient;
@@ -185,8 +185,9 @@ public class RatisConsensus implements IConsensus {
}
if (suggestedLeader != null) {
- Endpoint leaderEndPoint = Utils.getEndPoint(suggestedLeader);
- writeResult.setRedirectNode(new EndPoint(leaderEndPoint.getIp(),
leaderEndPoint.getPort()));
+ Endpoint leaderTEndpoint = Utils.getEndpoint(suggestedLeader);
+ writeResult.setRedirectNode(
+ new TEndpoint(leaderTEndpoint.getIp(), leaderTEndpoint.getPort()));
}
return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build();
@@ -421,7 +422,7 @@ public class RatisConsensus implements IConsensus {
newConfiguration.add(newRaftLeader);
} else {
// degrade every other peer to default priority
- newConfiguration.add(Utils.toRaftPeer(Utils.getEndPoint(raftPeer),
DEFAULT_PRIORITY));
+ newConfiguration.add(Utils.toRaftPeer(Utils.getEndpoint(raftPeer),
DEFAULT_PRIORITY));
}
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
index e43544f..e68ef86 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.consensus.ratis;
import org.apache.iotdb.commons.partition.ConsensusGroupId;
+import org.apache.iotdb.commons.partition.Endpoint;
import org.apache.iotdb.commons.partition.GroupType;
-import org.apache.iotdb.consensus.common.Endpoint;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
@@ -83,7 +83,7 @@ public class Utils {
return toRaftPeer(peer.getEndpoint(), priority);
}
- public static Endpoint getEndPoint(RaftPeer raftPeer) {
+ public static Endpoint getEndpoint(RaftPeer raftPeer) {
String address = raftPeer.getAddress(); // ip:port
String[] split = address.split(":");
return new Endpoint(split[0], Integer.parseInt(split[1]));
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index abb5c65..8cb52ad 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -19,11 +19,11 @@
package org.apache.iotdb.consensus.ratis;
import org.apache.iotdb.commons.partition.ConsensusGroupId;
+import org.apache.iotdb.commons.partition.Endpoint;
import org.apache.iotdb.commons.partition.GroupType;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.ConsensusGroup;
import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.Endpoint;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index ae879e5..d4090ef 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -20,10 +20,10 @@
package org.apache.iotdb.consensus.standalone;
import org.apache.iotdb.commons.partition.ConsensusGroupId;
+import org.apache.iotdb.commons.partition.Endpoint;
import org.apache.iotdb.commons.partition.GroupType;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.Endpoint;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionInfo.java
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionInfo.java
index 9643471..0e942eb 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionInfo.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionInfo.java
@@ -26,21 +26,21 @@ import java.util.stream.Collectors;
public class DataPartitionInfo {
// Map<StorageGroup, Map<DeviceGroupID, Map<TimePartitionId,
List<DataRegionPlaceInfo>>>>
- private Map<String, Map<DeviceGroupId, Map<TimePartitionId,
List<DataRegionReplicaSet>>>>
+ private Map<String, Map<DeviceGroupId, Map<TimePartitionId,
List<RegionReplicaSet>>>>
dataPartitionMap;
- public Map<String, Map<DeviceGroupId, Map<TimePartitionId,
List<DataRegionReplicaSet>>>>
+ public Map<String, Map<DeviceGroupId, Map<TimePartitionId,
List<RegionReplicaSet>>>>
getDataPartitionMap() {
return dataPartitionMap;
}
public void setDataPartitionMap(
- Map<String, Map<DeviceGroupId, Map<TimePartitionId,
List<DataRegionReplicaSet>>>>
+ Map<String, Map<DeviceGroupId, Map<TimePartitionId,
List<RegionReplicaSet>>>>
dataPartitionMap) {
this.dataPartitionMap = dataPartitionMap;
}
- public List<DataRegionReplicaSet> getDataRegionReplicaSet(
+ public List<RegionReplicaSet> getDataRegionReplicaSet(
String deviceName, List<TimePartitionId> timePartitionIdList) {
String storageGroup = getStorageGroupByDevice(deviceName);
DeviceGroupId deviceGroupId = calculateDeviceGroupId(deviceName);
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/common/Endpoint.java
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/Endpoint.java
similarity index 97%
rename from
consensus/src/main/java/org/apache/iotdb/consensus/common/Endpoint.java
rename to
node-commons/src/main/java/org/apache/iotdb/commons/partition/Endpoint.java
index b01b7e4..dac1198 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/Endpoint.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/Endpoint.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.common;
+package org.apache.iotdb.commons.partition;
import java.util.Objects;
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
similarity index 72%
rename from
node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java
rename to
node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
index 7346363..7736e73 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
@@ -18,32 +18,30 @@
*/
package org.apache.iotdb.commons.partition;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
-
import java.util.List;
-public class DataRegionReplicaSet {
- private DataRegionId Id;
- private List<EndPoint> endPointList;
+public class RegionReplicaSet {
+ private ConsensusGroupId Id;
+ private List<Endpoint> endPointList;
- public DataRegionReplicaSet(DataRegionId Id, List<EndPoint> endPointList) {
+ public RegionReplicaSet(ConsensusGroupId Id, List<Endpoint> endPointList) {
this.Id = Id;
this.endPointList = endPointList;
}
- public List<EndPoint> getEndPointList() {
+ public List<Endpoint> getEndPointList() {
return endPointList;
}
- public void setEndPointList(List<EndPoint> endPointList) {
+ public void setEndPointList(List<Endpoint> endPointList) {
this.endPointList = endPointList;
}
- public DataRegionId getId() {
+ public ConsensusGroupId getId() {
return Id;
}
- public void setId(DataRegionId id) {
+ public void setId(ConsensusGroupId id) {
this.Id = id;
}
@@ -56,6 +54,6 @@ public class DataRegionReplicaSet {
}
public boolean equals(Object obj) {
- return obj instanceof DataRegionReplicaSet &&
obj.toString().equals(toString());
+ return obj instanceof RegionReplicaSet &&
obj.toString().equals(toString());
}
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaRegionReplicaSet.java
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaRegionReplicaSet.java
index 7f6e863..1bf7502 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaRegionReplicaSet.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaRegionReplicaSet.java
@@ -18,13 +18,13 @@
*/
package org.apache.iotdb.commons.partition;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.TEndpoint;
import java.util.List;
public class SchemaRegionReplicaSet {
private SchemaRegionId schemaRegionId;
- private List<EndPoint> endPointList;
+ private List<TEndpoint> endPointList;
public SchemaRegionId getSchemaRegionId() {
return schemaRegionId;
@@ -34,11 +34,11 @@ public class SchemaRegionReplicaSet {
this.schemaRegionId = schemaRegionId;
}
- public List<EndPoint> getEndPointList() {
+ public List<TEndpoint> getTEndpointList() {
return endPointList;
}
- public void setEndPointList(List<EndPoint> endPointList) {
+ public void setTEndpointList(List<TEndpoint> endPointList) {
this.endPointList = endPointList;
}
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java
b/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java
index 3d9995e..72b6ad4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/utils/CommonUtils.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.commons.utils;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.TEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,8 +31,8 @@ import java.util.List;
public class CommonUtils {
private static final Logger logger =
LoggerFactory.getLogger(CommonUtils.class);
- public static EndPoint parseNodeUrl(String nodeUrl) throws
BadNodeUrlException {
- EndPoint result = new EndPoint();
+ public static TEndpoint parseNodeUrl(String nodeUrl) throws
BadNodeUrlException {
+ TEndpoint result = new TEndpoint();
String[] split = nodeUrl.split(":");
if (split.length != 2) {
logger.warn("Bad node url: {}", nodeUrl);
@@ -49,8 +49,8 @@ public class CommonUtils {
return result;
}
- public static List<EndPoint> parseNodeUrls(List<String> nodeUrls) throws
BadNodeUrlException {
- List<EndPoint> result = new ArrayList<>();
+ public static List<TEndpoint> parseNodeUrls(List<String> nodeUrls) throws
BadNodeUrlException {
+ List<TEndpoint> result = new ArrayList<>();
for (String url : nodeUrls) {
result.add(parseNodeUrl(url));
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java
b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java
index fc0b7f3..162e392 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java
@@ -20,9 +20,9 @@
package org.apache.iotdb.db.consensus;
import org.apache.iotdb.commons.partition.ConsensusGroupId;
+import org.apache.iotdb.commons.partition.Endpoint;
import org.apache.iotdb.commons.partition.GroupType;
import org.apache.iotdb.consensus.IConsensus;
-import org.apache.iotdb.consensus.common.Endpoint;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.standalone.StandAloneConsensus;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
index 0a905e1..8b87244 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.mpp.sql.analyze;
import org.apache.iotdb.commons.partition.DataPartitionInfo;
-import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.commons.partition.SchemaPartitionInfo;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
@@ -53,7 +53,7 @@ public class Analysis {
private Map<String, Set<PartialPath>> deviceIdToPathsMap;
- public List<DataRegionReplicaSet> getPartitionInfo(PartialPath seriesPath,
Filter timefilter) {
+ public List<RegionReplicaSet> getPartitionInfo(PartialPath seriesPath,
Filter timefilter) {
// TODO: (xingtanzjr) implement the calculation of timePartitionIdList
return dataPartitionInfo.getDataRegionReplicaSet(seriesPath.getDevice(),
null);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
index 0fdc000..c5457bb 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.mpp.sql.analyze;
import org.apache.iotdb.commons.partition.*;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
import java.util.*;
@@ -37,41 +36,41 @@ public class FakePartitionFetcherImpl implements
IPartitionFetcher {
String device3 = "root.sg.d333";
DataPartitionInfo dataPartitionInfo = new DataPartitionInfo();
- Map<String, Map<DeviceGroupId, Map<TimePartitionId,
List<DataRegionReplicaSet>>>>
- dataPartitionMap = new HashMap<>();
- Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>>
sgPartitionMap =
+ Map<String, Map<DeviceGroupId, Map<TimePartitionId,
List<RegionReplicaSet>>>> dataPartitionMap =
+ new HashMap<>();
+ Map<DeviceGroupId, Map<TimePartitionId, List<RegionReplicaSet>>>
sgPartitionMap =
new HashMap<>();
- List<DataRegionReplicaSet> d1DataRegions = new ArrayList<>();
+ List<RegionReplicaSet> d1DataRegions = new ArrayList<>();
d1DataRegions.add(
- new DataRegionReplicaSet(
- new DataRegionId(1),
- Arrays.asList(new EndPoint("192.0.1.1", 9000), new
EndPoint("192.0.1.2", 9000))));
+ new RegionReplicaSet(
+ new ConsensusGroupId(GroupType.DataRegion, 1),
+ Arrays.asList(new Endpoint("192.0.1.1", 9000), new
Endpoint("192.0.1.2", 9000))));
d1DataRegions.add(
- new DataRegionReplicaSet(
- new DataRegionId(2),
- Arrays.asList(new EndPoint("192.0.2.1", 9000), new
EndPoint("192.0.2.2", 9000))));
- Map<TimePartitionId, List<DataRegionReplicaSet>> d1DataRegionMap = new
HashMap<>();
+ new RegionReplicaSet(
+ new ConsensusGroupId(GroupType.DataRegion, 2),
+ Arrays.asList(new Endpoint("192.0.2.1", 9000), new
Endpoint("192.0.2.2", 9000))));
+ Map<TimePartitionId, List<RegionReplicaSet>> d1DataRegionMap = new
HashMap<>();
d1DataRegionMap.put(new TimePartitionId(), d1DataRegions);
- List<DataRegionReplicaSet> d2DataRegions = new ArrayList<>();
+ List<RegionReplicaSet> d2DataRegions = new ArrayList<>();
d2DataRegions.add(
- new DataRegionReplicaSet(
- new DataRegionId(3),
- Arrays.asList(new EndPoint("192.0.3.1", 9000), new
EndPoint("192.0.3.2", 9000))));
- Map<TimePartitionId, List<DataRegionReplicaSet>> d2DataRegionMap = new
HashMap<>();
+ new RegionReplicaSet(
+ new ConsensusGroupId(GroupType.DataRegion, 3),
+ Arrays.asList(new Endpoint("192.0.3.1", 9000), new
Endpoint("192.0.3.2", 9000))));
+ Map<TimePartitionId, List<RegionReplicaSet>> d2DataRegionMap = new
HashMap<>();
d2DataRegionMap.put(new TimePartitionId(), d2DataRegions);
- List<DataRegionReplicaSet> d3DataRegions = new ArrayList<>();
+ List<RegionReplicaSet> d3DataRegions = new ArrayList<>();
d3DataRegions.add(
- new DataRegionReplicaSet(
- new DataRegionId(1),
- Arrays.asList(new EndPoint("192.0.1.1", 9000), new
EndPoint("192.0.1.2", 9000))));
+ new RegionReplicaSet(
+ new ConsensusGroupId(GroupType.DataRegion, 1),
+ Arrays.asList(new Endpoint("192.0.1.1", 9000), new
Endpoint("192.0.1.2", 9000))));
d3DataRegions.add(
- new DataRegionReplicaSet(
- new DataRegionId(4),
- Arrays.asList(new EndPoint("192.0.4.1", 9000), new
EndPoint("192.0.4.2", 9000))));
- Map<TimePartitionId, List<DataRegionReplicaSet>> d3DataRegionMap = new
HashMap<>();
+ new RegionReplicaSet(
+ new ConsensusGroupId(GroupType.DataRegion, 4),
+ Arrays.asList(new Endpoint("192.0.4.1", 9000), new
Endpoint("192.0.4.2", 9000))));
+ Map<TimePartitionId, List<RegionReplicaSet>> d3DataRegionMap = new
HashMap<>();
d3DataRegionMap.put(new TimePartitionId(), d3DataRegions);
sgPartitionMap.put(new DeviceGroupId(device1.length()), d1DataRegionMap);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 881ea55..06f96f6 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.planner;
-import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.*;
@@ -105,11 +105,11 @@ public class DistributionPlanner {
// If the child is SeriesScanNode, we need to check whether this
node should be seperated
// into several splits.
SeriesScanNode handle = (SeriesScanNode) child;
- List<DataRegionReplicaSet> dataDistribution =
+ List<RegionReplicaSet> dataDistribution =
analysis.getPartitionInfo(handle.getSeriesPath(),
handle.getTimeFilter());
// If the size of dataDistribution is m, this SeriesScanNode should
be seperated into m
// SeriesScanNode.
- for (DataRegionReplicaSet dataRegion : dataDistribution) {
+ for (RegionReplicaSet dataRegion : dataDistribution) {
SeriesScanNode split = (SeriesScanNode) handle.clone();
split.setDataRegionReplicaSet(dataRegion);
sources.add(split);
@@ -128,7 +128,7 @@ public class DistributionPlanner {
}
// Step 2: For the source nodes, group them by the DataRegion.
- Map<DataRegionReplicaSet, List<SeriesScanNode>> sourceGroup =
+ Map<RegionReplicaSet, List<SeriesScanNode>> sourceGroup =
sources.stream().collect(Collectors.groupingBy(SeriesScanNode::getDataRegionReplicaSet));
// Step 3: For the source nodes which belong to same data region, add a
TimeJoinNode for them
// and make the
@@ -200,7 +200,7 @@ public class DistributionPlanner {
visitedChildren.add(visit(child, context));
});
- DataRegionReplicaSet dataRegion =
calculateDataRegionByChildren(visitedChildren, context);
+ RegionReplicaSet dataRegion =
calculateDataRegionByChildren(visitedChildren, context);
NodeDistributionType distributionType =
nodeDistributionIsSame(visitedChildren, context)
? NodeDistributionType.SAME_WITH_ALL_CHILDREN
@@ -229,7 +229,7 @@ public class DistributionPlanner {
return newNode;
}
- private DataRegionReplicaSet calculateDataRegionByChildren(
+ private RegionReplicaSet calculateDataRegionByChildren(
List<PlanNode> children, NodeGroupContext context) {
// We always make the dataRegion of TimeJoinNode to be the same as its
first child.
// TODO: (xingtanzjr) We need to implement more suitable policies here
@@ -278,9 +278,9 @@ public class DistributionPlanner {
private class NodeDistribution {
private NodeDistributionType type;
- private DataRegionReplicaSet dataRegion;
+ private RegionReplicaSet dataRegion;
- private NodeDistribution(NodeDistributionType type, DataRegionReplicaSet
dataRegion) {
+ private NodeDistribution(NodeDistributionType type, RegionReplicaSet
dataRegion) {
this.type = type;
this.dataRegion = dataRegion;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index 61f9292..4c60bcd 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -18,7 +18,8 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan;
-import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.commons.partition.Endpoint;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
@@ -27,7 +28,6 @@ import
org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
import java.nio.ByteBuffer;
@@ -37,8 +37,8 @@ public class FragmentInstance implements IConsensusRequest {
// The reference of PlanFragment which this instance is generated from
private PlanFragment fragment;
// The DataRegion where the FragmentInstance should run
- private DataRegionReplicaSet dataRegion;
- private EndPoint hostEndpoint;
+ private RegionReplicaSet dataRegion;
+ private Endpoint hostEndpoint;
// We can add some more params for a specific FragmentInstance
// So that we can make different FragmentInstance owns different data range.
@@ -52,19 +52,19 @@ public class FragmentInstance implements IConsensusRequest {
return new FragmentInstanceId(id, String.valueOf(index));
}
- public DataRegionReplicaSet getDataRegionId() {
+ public RegionReplicaSet getDataRegionId() {
return dataRegion;
}
- public void setDataRegionId(DataRegionReplicaSet dataRegion) {
+ public void setDataRegionId(RegionReplicaSet dataRegion) {
this.dataRegion = dataRegion;
}
- public EndPoint getHostEndpoint() {
+ public Endpoint getHostEndpoint() {
return hostEndpoint;
}
- public void setHostEndpoint(EndPoint hostEndpoint) {
+ public void setHostEndpoint(Endpoint hostEndpoint) {
this.hostEndpoint = hostEndpoint;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
index f2a7132..5863bbf 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan;
-import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -55,16 +55,16 @@ public class PlanFragment {
// In current version, one PlanFragment should contain at least one
SourceNode,
// and the DataRegions of all SourceNodes should be same in one PlanFragment.
// So we can use the DataRegion of one SourceNode as the PlanFragment's
DataRegion.
- public DataRegionReplicaSet getTargetDataRegion() {
+ public RegionReplicaSet getTargetDataRegion() {
return getNodeDataRegion(root);
}
- private DataRegionReplicaSet getNodeDataRegion(PlanNode root) {
+ private RegionReplicaSet getNodeDataRegion(PlanNode root) {
if (root instanceof SourceNode) {
return ((SourceNode) root).getDataRegionReplicaSet();
}
for (PlanNode child : root.getChildren()) {
- DataRegionReplicaSet result = getNodeDataRegion(child);
+ RegionReplicaSet result = getNodeDataRegion(child);
if (result != null) {
return result;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index 1be8d95..b099630 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan;
-import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -77,7 +77,7 @@ public class SimpleFragmentParallelPlanner implements
IFragmentParallelPlaner {
// Get the target DataRegion for origin PlanFragment, then its instance
will be distributed one
// of them.
- DataRegionReplicaSet dataRegion = fragment.getTargetDataRegion();
+ RegionReplicaSet dataRegion = fragment.getTargetDataRegion();
// Set DataRegion and target host for the instance
// We need to store all the replica host in case of the scenario that the
instance need to be
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 4c3c47e..47e7563 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -19,11 +19,11 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
+import org.apache.iotdb.commons.partition.Endpoint;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
import com.google.common.collect.ImmutableList;
@@ -39,7 +39,7 @@ public class ExchangeNode extends PlanNode {
// In current version, one ExchangeNode will only have one source.
// And the fragment which the sourceNode belongs to will only have one
instance.
// Thus, by nodeId and endpoint, the ExchangeNode can know where its source
from.
- private EndPoint upstreamEndpoint;
+ private Endpoint upstreamEndpoint;
private FragmentInstanceId upstreamInstanceId;
private PlanNodeId upstreamPlanNodeId;
@@ -76,7 +76,7 @@ public class ExchangeNode extends PlanNode {
return CHILD_COUNT_NO_LIMIT;
}
- public void setUpstream(EndPoint endPoint, FragmentInstanceId instanceId,
PlanNodeId nodeId) {
+ public void setUpstream(Endpoint endPoint, FragmentInstanceId instanceId,
PlanNodeId nodeId) {
this.upstreamEndpoint = endPoint;
this.upstreamInstanceId = instanceId;
this.upstreamPlanNodeId = nodeId;
@@ -127,7 +127,7 @@ public class ExchangeNode extends PlanNode {
this.child = null;
}
- public EndPoint getUpstreamEndpoint() {
+ public Endpoint getUpstreamEndpoint() {
return upstreamEndpoint;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 7c28d2d..be4211f 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -18,11 +18,11 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
+import org.apache.iotdb.commons.partition.Endpoint;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
import com.google.common.collect.ImmutableList;
@@ -33,7 +33,7 @@ public class FragmentSinkNode extends SinkNode {
private PlanNode child;
private ExchangeNode downStreamNode;
- private EndPoint downStreamEndpoint;
+ private Endpoint downStreamEndpoint;
private FragmentInstanceId downStreamInstanceId;
private PlanNodeId downStreamPlanNodeId;
@@ -111,13 +111,13 @@ public class FragmentSinkNode extends SinkNode {
this.downStreamNode = downStreamNode;
}
- public void setDownStream(EndPoint endPoint, FragmentInstanceId instanceId,
PlanNodeId nodeId) {
+ public void setDownStream(Endpoint endPoint, FragmentInstanceId instanceId,
PlanNodeId nodeId) {
this.downStreamEndpoint = endPoint;
this.downStreamInstanceId = instanceId;
this.downStreamPlanNodeId = nodeId;
}
- public EndPoint getDownStreamEndpoint() {
+ public Endpoint getDownStreamEndpoint() {
return downStreamEndpoint;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index fa2009d..6c0ed69 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
-import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -66,7 +66,7 @@ public class SeriesAggregateScanNode extends SourceNode {
private String columnName;
// The id of DataRegion where the node will run
- private DataRegionReplicaSet dataRegionReplicaSet;
+ private RegionReplicaSet regionReplicaSet;
public SeriesAggregateScanNode(PlanNodeId id) {
super(id);
@@ -110,13 +110,13 @@ public class SeriesAggregateScanNode extends SourceNode {
public void open() throws Exception {}
@Override
- public DataRegionReplicaSet getDataRegionReplicaSet() {
- return this.dataRegionReplicaSet;
+ public RegionReplicaSet getDataRegionReplicaSet() {
+ return this.regionReplicaSet;
}
@Override
- public void setDataRegionReplicaSet(DataRegionReplicaSet
dataRegionReplicaSet) {
- this.dataRegionReplicaSet = dataRegionReplicaSet;
+ public void setDataRegionReplicaSet(RegionReplicaSet regionReplicaSet) {
+ this.regionReplicaSet = regionReplicaSet;
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index a51adb5..b8e7a06 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
-import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -70,17 +70,16 @@ public class SeriesScanNode extends SourceNode {
private String columnName;
// The id of DataRegion where the node will run
- private DataRegionReplicaSet dataRegionReplicaSet;
+ private RegionReplicaSet regionReplicaSet;
public SeriesScanNode(PlanNodeId id, PartialPath seriesPath) {
super(id);
this.seriesPath = seriesPath;
}
- public SeriesScanNode(
- PlanNodeId id, PartialPath seriesPath, DataRegionReplicaSet
dataRegionReplicaSet) {
+ public SeriesScanNode(PlanNodeId id, PartialPath seriesPath,
RegionReplicaSet regionReplicaSet) {
this(id, seriesPath);
- this.dataRegionReplicaSet = dataRegionReplicaSet;
+ this.regionReplicaSet = regionReplicaSet;
}
public void setTimeFilter(Filter timeFilter) {
@@ -98,12 +97,12 @@ public class SeriesScanNode extends SourceNode {
public void open() throws Exception {}
@Override
- public DataRegionReplicaSet getDataRegionReplicaSet() {
- return dataRegionReplicaSet;
+ public RegionReplicaSet getDataRegionReplicaSet() {
+ return regionReplicaSet;
}
- public void setDataRegionReplicaSet(DataRegionReplicaSet dataRegion) {
- this.dataRegionReplicaSet = dataRegion;
+ public void setDataRegionReplicaSet(RegionReplicaSet dataRegion) {
+ this.regionReplicaSet = dataRegion;
}
@Override
@@ -146,7 +145,7 @@ public class SeriesScanNode extends SourceNode {
@Override
public PlanNode clone() {
- return new SeriesScanNode(getId(), getSeriesPath(),
this.dataRegionReplicaSet);
+ return new SeriesScanNode(getId(), getSeriesPath(), this.regionReplicaSet);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
index 758c087..5fdf152 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
-import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -30,9 +30,9 @@ public abstract class SourceNode extends PlanNode implements
AutoCloseable {
public abstract void open() throws Exception;
- public abstract DataRegionReplicaSet getDataRegionReplicaSet();
+ public abstract RegionReplicaSet getDataRegionReplicaSet();
- public abstract void setDataRegionReplicaSet(DataRegionReplicaSet
dataRegionReplicaSet);
+ public abstract void setDataRegionReplicaSet(RegionReplicaSet
regionReplicaSet);
public abstract String getDeviceName();
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index cd38f79..8e27d1a 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -225,8 +225,8 @@ public class AlignByDeviceDataSet extends QueryDataSet {
}
if (currentDataSet.getEndPoint() != null) {
- org.apache.iotdb.service.rpc.thrift.EndPoint endPoint =
- new org.apache.iotdb.service.rpc.thrift.EndPoint();
+ org.apache.iotdb.service.rpc.thrift.TEndpoint endPoint =
+ new org.apache.iotdb.service.rpc.thrift.TEndpoint();
endPoint.setIp(currentDataSet.getEndPoint().getIp());
endPoint.setPort(currentDataSet.getEndPoint().getPort());
throw new RedirectException(endPoint);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index d5533e6..4801f75 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -35,7 +35,7 @@ import
org.apache.iotdb.db.service.thrift.impl.DataNodeManagementServiceImpl;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.TEndpoint;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -62,7 +62,7 @@ public class DataNode implements DataNodeMBean {
*/
private static final int DEFAULT_JOIN_RETRY = 10;
- private EndPoint thisNode = new EndPoint();
+ private TEndpoint thisNode = new TEndpoint();
private int dataNodeID;
@@ -122,7 +122,7 @@ public class DataNode implements DataNodeMBean {
}
public void joinCluster() throws StartupException {
- List<EndPoint> configNodes;
+ List<TEndpoint> configNodes;
try {
configNodes =
CommonUtils.parseNodeUrls(IoTDBDescriptor.getInstance().getConfig().getConfigNodeUrls());
@@ -134,7 +134,7 @@ public class DataNode implements DataNodeMBean {
while (retry > 0) {
// randomly pick up a config node to try
Random random = new Random();
- EndPoint configNode =
configNodes.get(random.nextInt(configNodes.size()));
+ TEndpoint configNode =
configNodes.get(random.nextInt(configNodes.size()));
logger.info("start joining the cluster with the help of {}", configNode);
try {
ConfigIService.Client client = createClient(configNode);
@@ -199,7 +199,7 @@ public class DataNode implements DataNodeMBean {
private DataNodeHolder() {}
}
- private ConfigIService.Client createClient(EndPoint endPoint) throws
IoTDBConnectionException {
+ private ConfigIService.Client createClient(TEndpoint endPoint) throws
IoTDBConnectionException {
TTransport transport;
try {
transport =
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index b4cbd67..79bdf6e 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -83,8 +83,8 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.ServerProperties;
+import org.apache.iotdb.service.rpc.thrift.TEndpoint;
import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
@@ -868,7 +868,7 @@ public class TSServiceImpl implements TSIService.Iface {
resp.setQueryDataSet(tsQueryDataSet);
} catch (RedirectException e) {
if (plan.isEnableRedirect()) {
- EndPoint endPoint = e.getEndPoint();
+ TEndpoint endPoint = e.getTEndpoint();
return redirectQueryToAnotherNode(resp, context, endPoint.ip,
endPoint.port);
} else {
LOGGER.error(
@@ -928,7 +928,7 @@ public class TSServiceImpl implements TSIService.Iface {
ip,
port);
TSStatus status = new TSStatus();
- status.setRedirectNode(new EndPoint(ip, port));
+ status.setRedirectNode(new TEndpoint(ip, port));
status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
resp.setStatus(status);
resp.setQueryId(context.getQueryId());
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index d002001..5277b73 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -19,10 +19,12 @@
package org.apache.iotdb.db.mpp.sql.plan;
+import org.apache.iotdb.commons.partition.ConsensusGroupId;
import org.apache.iotdb.commons.partition.DataPartitionInfo;
-import org.apache.iotdb.commons.partition.DataRegionId;
-import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
import org.apache.iotdb.commons.partition.DeviceGroupId;
+import org.apache.iotdb.commons.partition.Endpoint;
+import org.apache.iotdb.commons.partition.GroupType;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.commons.partition.TimePartitionId;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -42,7 +44,6 @@ import
org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.junit.Test;
@@ -161,41 +162,41 @@ public class DistributionPlannerTest {
String device3 = "root.sg.d333";
DataPartitionInfo dataPartitionInfo = new DataPartitionInfo();
- Map<String, Map<DeviceGroupId, Map<TimePartitionId,
List<DataRegionReplicaSet>>>>
- dataPartitionMap = new HashMap<>();
- Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>>
sgPartitionMap =
+ Map<String, Map<DeviceGroupId, Map<TimePartitionId,
List<RegionReplicaSet>>>> dataPartitionMap =
+ new HashMap<>();
+ Map<DeviceGroupId, Map<TimePartitionId, List<RegionReplicaSet>>>
sgPartitionMap =
new HashMap<>();
- List<DataRegionReplicaSet> d1DataRegions = new ArrayList<>();
+ List<RegionReplicaSet> d1DataRegions = new ArrayList<>();
d1DataRegions.add(
- new DataRegionReplicaSet(
- new DataRegionId(1),
- Arrays.asList(new EndPoint("192.0.1.1", 9000), new
EndPoint("192.0.1.2", 9000))));
+ new RegionReplicaSet(
+ new ConsensusGroupId(GroupType.DataRegion, 1),
+ Arrays.asList(new Endpoint("192.0.1.1", 9000), new
Endpoint("192.0.1.2", 9000))));
d1DataRegions.add(
- new DataRegionReplicaSet(
- new DataRegionId(2),
- Arrays.asList(new EndPoint("192.0.2.1", 9000), new
EndPoint("192.0.2.2", 9000))));
- Map<TimePartitionId, List<DataRegionReplicaSet>> d1DataRegionMap = new
HashMap<>();
+ new RegionReplicaSet(
+ new ConsensusGroupId(GroupType.DataRegion, 2),
+ Arrays.asList(new Endpoint("192.0.2.1", 9000), new
Endpoint("192.0.2.2", 9000))));
+ Map<TimePartitionId, List<RegionReplicaSet>> d1DataRegionMap = new
HashMap<>();
d1DataRegionMap.put(new TimePartitionId(), d1DataRegions);
- List<DataRegionReplicaSet> d2DataRegions = new ArrayList<>();
+ List<RegionReplicaSet> d2DataRegions = new ArrayList<>();
d2DataRegions.add(
- new DataRegionReplicaSet(
- new DataRegionId(3),
- Arrays.asList(new EndPoint("192.0.3.1", 9000), new
EndPoint("192.0.3.2", 9000))));
- Map<TimePartitionId, List<DataRegionReplicaSet>> d2DataRegionMap = new
HashMap<>();
+ new RegionReplicaSet(
+ new ConsensusGroupId(GroupType.DataRegion, 3),
+ Arrays.asList(new Endpoint("192.0.3.1", 9000), new
Endpoint("192.0.3.2", 9000))));
+ Map<TimePartitionId, List<RegionReplicaSet>> d2DataRegionMap = new
HashMap<>();
d2DataRegionMap.put(new TimePartitionId(), d2DataRegions);
- List<DataRegionReplicaSet> d3DataRegions = new ArrayList<>();
+ List<RegionReplicaSet> d3DataRegions = new ArrayList<>();
d3DataRegions.add(
- new DataRegionReplicaSet(
- new DataRegionId(1),
- Arrays.asList(new EndPoint("192.0.1.1", 9000), new
EndPoint("192.0.1.2", 9000))));
+ new RegionReplicaSet(
+ new ConsensusGroupId(GroupType.DataRegion, 1),
+ Arrays.asList(new Endpoint("192.0.1.1", 9000), new
Endpoint("192.0.1.2", 9000))));
d3DataRegions.add(
- new DataRegionReplicaSet(
- new DataRegionId(4),
- Arrays.asList(new EndPoint("192.0.4.1", 9000), new
EndPoint("192.0.4.2", 9000))));
- Map<TimePartitionId, List<DataRegionReplicaSet>> d3DataRegionMap = new
HashMap<>();
+ new RegionReplicaSet(
+ new ConsensusGroupId(GroupType.DataRegion, 4),
+ Arrays.asList(new Endpoint("192.0.4.1", 9000), new
Endpoint("192.0.4.2", 9000))));
+ Map<TimePartitionId, List<RegionReplicaSet>> d3DataRegionMap = new
HashMap<>();
d3DataRegionMap.put(new TimePartitionId(), d3DataRegions);
sgPartitionMap.put(new DeviceGroupId(device1.length()), d1DataRegionMap);
diff --git
a/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
b/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
index 15f3157..f090247 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RedirectException.java
@@ -19,34 +19,34 @@
package org.apache.iotdb.rpc;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.TEndpoint;
import java.io.IOException;
import java.util.Map;
public class RedirectException extends IOException {
- private final EndPoint endPoint;
+ private final TEndpoint TEndpoint;
- private final Map<String, EndPoint> deviceEndPointMap;
+ private final Map<String, TEndpoint> deviceTEndpointMap;
- public RedirectException(EndPoint endpoint) {
- super("later request in same group will be redirected to " +
endpoint.toString());
- this.endPoint = endpoint;
- this.deviceEndPointMap = null;
+ public RedirectException(TEndpoint TEndpoint) {
+ super("later request in same group will be redirected to " +
TEndpoint.toString());
+ this.TEndpoint = TEndpoint;
+ this.deviceTEndpointMap = null;
}
- public RedirectException(Map<String, EndPoint> deviceEndPointMap) {
- super("later request in same group will be redirected to " +
deviceEndPointMap);
- this.endPoint = null;
- this.deviceEndPointMap = deviceEndPointMap;
+ public RedirectException(Map<String, TEndpoint> deviceTEndpointMap) {
+ super("later request in same group will be redirected to " +
deviceTEndpointMap);
+ this.TEndpoint = null;
+ this.deviceTEndpointMap = deviceTEndpointMap;
}
- public EndPoint getEndPoint() {
- return this.endPoint;
+ public TEndpoint getTEndpoint() {
+ return this.TEndpoint;
}
- public Map<String, EndPoint> getDeviceEndPointMap() {
- return deviceEndPointMap;
+ public Map<String, TEndpoint> getDeviceTEndpointMap() {
+ return deviceTEndpointMap;
}
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index b294325..d9d66de 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.rpc;
import org.apache.iotdb.protocol.influxdb.rpc.thrift.InfluxDBService;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.TEndpoint;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
import org.apache.iotdb.service.rpc.thrift.TSIService;
@@ -123,15 +123,15 @@ public class RpcUtils {
verifySuccess(status);
if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()
|| status.getCode() == TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
- Map<String, EndPoint> deviceEndPointMap = new HashMap<>();
+ Map<String, TEndpoint> deviceTEndpointMap = new HashMap<>();
List<TSStatus> statusSubStatus = status.getSubStatus();
for (int i = 0; i < statusSubStatus.size(); i++) {
TSStatus subStatus = statusSubStatus.get(i);
if (subStatus.isSetRedirectNode()) {
- deviceEndPointMap.put(devices.get(i), subStatus.getRedirectNode());
+ deviceTEndpointMap.put(devices.get(i), subStatus.getRedirectNode());
}
}
- throw new RedirectException(deviceEndPointMap);
+ throw new RedirectException(deviceTEndpointMap);
}
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java
b/session/src/main/java/org/apache/iotdb/session/Session.java
index f47aeb0..51d293c 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.TEndpoint;
import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
@@ -121,15 +121,15 @@ public class Session {
protected int thriftDefaultBufferSize;
protected int thriftMaxFrameSize;
- protected EndPoint defaultEndPoint;
+ protected TEndpoint defaultTEndpoint;
protected SessionConnection defaultSessionConnection;
private boolean isClosed = true;
// Cluster version cache
protected boolean enableCacheLeader;
protected SessionConnection metaSessionConnection;
- protected volatile Map<String, EndPoint> deviceIdToEndpoint;
- protected volatile Map<EndPoint, SessionConnection>
endPointToSessionConnection;
+ protected volatile Map<String, TEndpoint> deviceIdToEndpoint;
+ protected volatile Map<TEndpoint, SessionConnection>
endPointToSessionConnection;
protected boolean enableQueryRedirection = false;
@@ -275,7 +275,7 @@ public class Session {
int thriftMaxFrameSize,
boolean enableCacheLeader,
Version version) {
- this.defaultEndPoint = new EndPoint(host, rpcPort);
+ this.defaultTEndpoint = new TEndpoint(host, rpcPort);
this.username = username;
this.password = password;
this.fetchSize = fetchSize;
@@ -383,14 +383,14 @@ public class Session {
this.enableRPCCompression = enableRPCCompression;
this.connectionTimeoutInMs = connectionTimeoutInMs;
- defaultSessionConnection = constructSessionConnection(this,
defaultEndPoint, zoneId);
+ defaultSessionConnection = constructSessionConnection(this,
defaultTEndpoint, zoneId);
defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
metaSessionConnection = defaultSessionConnection;
isClosed = false;
if (enableCacheLeader || enableQueryRedirection) {
deviceIdToEndpoint = new ConcurrentHashMap<>();
endPointToSessionConnection = new ConcurrentHashMap<>();
- endPointToSessionConnection.put(defaultEndPoint,
defaultSessionConnection);
+ endPointToSessionConnection.put(defaultTEndpoint,
defaultSessionConnection);
}
}
@@ -412,7 +412,7 @@ public class Session {
}
public SessionConnection constructSessionConnection(
- Session session, EndPoint endpoint, ZoneId zoneId) throws
IoTDBConnectionException {
+ Session session, TEndpoint endpoint, ZoneId zoneId) throws
IoTDBConnectionException {
if (endpoint == null) {
return new SessionConnection(session, zoneId);
}
@@ -676,16 +676,16 @@ public class Session {
private SessionDataSet executeStatementMayRedirect(String sql, long
timeoutInMs)
throws StatementExecutionException, IoTDBConnectionException {
try {
- logger.debug("{} execute sql {}",
defaultSessionConnection.getEndPoint(), sql);
+ logger.debug("{} execute sql {}",
defaultSessionConnection.getTEndpoint(), sql);
return defaultSessionConnection.executeQueryStatement(sql, timeoutInMs);
} catch (RedirectException e) {
- handleQueryRedirection(e.getEndPoint());
+ handleQueryRedirection(e.getTEndpoint());
if (enableQueryRedirection) {
logger.debug(
"{} redirect query {} to {}",
- defaultSessionConnection.getEndPoint(),
+ defaultSessionConnection.getTEndpoint(),
sql,
- e.getEndPoint());
+ e.getTEndpoint());
// retry
try {
return defaultSessionConnection.executeQueryStatement(sql,
queryTimeoutInMs);
@@ -725,9 +725,9 @@ public class Session {
try {
return defaultSessionConnection.executeRawDataQuery(paths, startTime,
endTime);
} catch (RedirectException e) {
- handleQueryRedirection(e.getEndPoint());
+ handleQueryRedirection(e.getTEndpoint());
if (enableQueryRedirection) {
- logger.debug("redirect query {} to {}", paths, e.getEndPoint());
+ logger.debug("redirect query {} to {}", paths, e.getTEndpoint());
// retry
try {
return defaultSessionConnection.executeRawDataQuery(paths,
startTime, endTime);
@@ -753,7 +753,7 @@ public class Session {
try {
return defaultSessionConnection.executeLastDataQuery(paths, LastTime);
} catch (RedirectException e) {
- handleQueryRedirection(e.getEndPoint());
+ handleQueryRedirection(e.getTEndpoint());
if (enableQueryRedirection) {
// retry
try {
@@ -803,7 +803,7 @@ public class Session {
try {
getSessionConnection(prefixPath).insertRecord(request);
} catch (RedirectException e) {
- handleRedirection(prefixPath, e.getEndPoint());
+ handleRedirection(prefixPath, e.getTEndpoint());
}
}
@@ -812,12 +812,12 @@ public class Session {
try {
getSessionConnection(deviceId).insertRecord(request);
} catch (RedirectException e) {
- handleRedirection(deviceId, e.getEndPoint());
+ handleRedirection(deviceId, e.getTEndpoint());
}
}
private SessionConnection getSessionConnection(String deviceId) {
- EndPoint endPoint;
+ TEndpoint endPoint;
if (enableCacheLeader
&& !deviceIdToEndpoint.isEmpty()
&& (endPoint = deviceIdToEndpoint.get(deviceId)) != null) {
@@ -835,11 +835,11 @@ public class Session {
private void removeBrokenSessionConnection(SessionConnection
sessionConnection) {
// remove the cached broken leader session
if (enableCacheLeader) {
- EndPoint endPoint = null;
- for (Iterator<Entry<EndPoint, SessionConnection>> it =
+ TEndpoint endPoint = null;
+ for (Iterator<Entry<TEndpoint, SessionConnection>> it =
endPointToSessionConnection.entrySet().iterator();
it.hasNext(); ) {
- Map.Entry<EndPoint, SessionConnection> entry = it.next();
+ Map.Entry<TEndpoint, SessionConnection> entry = it.next();
if (entry.getValue().equals(sessionConnection)) {
endPoint = entry.getKey();
it.remove();
@@ -847,9 +847,9 @@ public class Session {
}
}
- for (Iterator<Entry<String, EndPoint>> it =
deviceIdToEndpoint.entrySet().iterator();
+ for (Iterator<Entry<String, TEndpoint>> it =
deviceIdToEndpoint.entrySet().iterator();
it.hasNext(); ) {
- Map.Entry<String, EndPoint> entry = it.next();
+ Map.Entry<String, TEndpoint> entry = it.next();
if (entry.getValue().equals(endPoint)) {
it.remove();
}
@@ -864,10 +864,10 @@ public class Session {
AtomicReference<IoTDBConnectionException> exceptionReference = new
AtomicReference<>();
SessionConnection connection =
endPointToSessionConnection.computeIfAbsent(
- e.getEndPoint(),
+ e.getTEndpoint(),
k -> {
try {
- return constructSessionConnection(this, e.getEndPoint(),
zoneId);
+ return constructSessionConnection(this, e.getTEndpoint(),
zoneId);
} catch (IoTDBConnectionException ex) {
exceptionReference.set(ex);
return null;
@@ -880,7 +880,7 @@ public class Session {
}
}
- private void handleRedirection(String deviceId, EndPoint endpoint)
+ private void handleRedirection(String deviceId, TEndpoint endpoint)
throws IoTDBConnectionException {
if (enableCacheLeader) {
AtomicReference<IoTDBConnectionException> exceptionReference = new
AtomicReference<>();
@@ -903,7 +903,7 @@ public class Session {
}
}
- private void handleQueryRedirection(EndPoint endPoint) throws
IoTDBConnectionException {
+ private void handleQueryRedirection(TEndpoint endPoint) throws
IoTDBConnectionException {
if (enableQueryRedirection) {
AtomicReference<IoTDBConnectionException> exceptionReference = new
AtomicReference<>();
SessionConnection connection =
@@ -1057,9 +1057,9 @@ public class Session {
try {
defaultSessionConnection.insertRecords(request);
} catch (RedirectException e) {
- Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
- for (Map.Entry<String, EndPoint> deviceEndPointEntry :
deviceEndPointMap.entrySet()) {
- handleRedirection(deviceEndPointEntry.getKey(),
deviceEndPointEntry.getValue());
+ Map<String, TEndpoint> deviceTEndpointMap = e.getDeviceTEndpointMap();
+ for (Map.Entry<String, TEndpoint> deviceTEndpointEntry :
deviceTEndpointMap.entrySet()) {
+ handleRedirection(deviceTEndpointEntry.getKey(),
deviceTEndpointEntry.getValue());
}
}
}
@@ -1094,9 +1094,9 @@ public class Session {
try {
defaultSessionConnection.insertRecords(request);
} catch (RedirectException e) {
- Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
- for (Map.Entry<String, EndPoint> deviceEndPointEntry :
deviceEndPointMap.entrySet()) {
- handleRedirection(deviceEndPointEntry.getKey(),
deviceEndPointEntry.getValue());
+ Map<String, TEndpoint> deviceTEndpointMap = e.getDeviceTEndpointMap();
+ for (Map.Entry<String, TEndpoint> deviceTEndpointEntry :
deviceTEndpointMap.entrySet()) {
+ handleRedirection(deviceTEndpointEntry.getKey(),
deviceTEndpointEntry.getValue());
}
}
}
@@ -1180,9 +1180,9 @@ public class Session {
try {
defaultSessionConnection.insertRecords(request);
} catch (RedirectException e) {
- Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
- for (Map.Entry<String, EndPoint> deviceEndPointEntry :
deviceEndPointMap.entrySet()) {
- handleRedirection(deviceEndPointEntry.getKey(),
deviceEndPointEntry.getValue());
+ Map<String, TEndpoint> deviceTEndpointMap = e.getDeviceTEndpointMap();
+ for (Map.Entry<String, TEndpoint> deviceTEndpointEntry :
deviceTEndpointMap.entrySet()) {
+ handleRedirection(deviceTEndpointEntry.getKey(),
deviceTEndpointEntry.getValue());
}
}
}
@@ -1218,9 +1218,9 @@ public class Session {
try {
defaultSessionConnection.insertRecords(request);
} catch (RedirectException e) {
- Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
- for (Map.Entry<String, EndPoint> deviceEndPointEntry :
deviceEndPointMap.entrySet()) {
- handleRedirection(deviceEndPointEntry.getKey(),
deviceEndPointEntry.getValue());
+ Map<String, TEndpoint> deviceTEndpointMap = e.getDeviceTEndpointMap();
+ for (Map.Entry<String, TEndpoint> deviceTEndpointEntry :
deviceTEndpointMap.entrySet()) {
+ handleRedirection(deviceTEndpointEntry.getKey(),
deviceTEndpointEntry.getValue());
}
}
}
@@ -1274,7 +1274,7 @@ public class Session {
try {
getSessionConnection(deviceId).insertRecordsOfOneDevice(request);
} catch (RedirectException e) {
- handleRedirection(deviceId, e.getEndPoint());
+ handleRedirection(deviceId, e.getTEndpoint());
}
}
@@ -1306,7 +1306,7 @@ public class Session {
try {
getSessionConnection(deviceId).insertStringRecordsOfOneDevice(req);
} catch (RedirectException e) {
- handleRedirection(deviceId, e.getEndPoint());
+ handleRedirection(deviceId, e.getTEndpoint());
}
}
@@ -1376,7 +1376,7 @@ public class Session {
try {
getSessionConnection(deviceId).insertRecordsOfOneDevice(request);
} catch (RedirectException e) {
- handleRedirection(deviceId, e.getEndPoint());
+ handleRedirection(deviceId, e.getTEndpoint());
}
}
@@ -1408,7 +1408,7 @@ public class Session {
try {
getSessionConnection(deviceId).insertStringRecordsOfOneDevice(req);
} catch (RedirectException e) {
- handleRedirection(deviceId, e.getEndPoint());
+ handleRedirection(deviceId, e.getTEndpoint());
}
}
@@ -1618,7 +1618,7 @@ public class Session {
try {
getSessionConnection(tablet.deviceId).insertTablet(request);
} catch (RedirectException e) {
- handleRedirection(tablet.deviceId, e.getEndPoint());
+ handleRedirection(tablet.deviceId, e.getTEndpoint());
}
}
@@ -1649,7 +1649,7 @@ public class Session {
try {
getSessionConnection(tablet.deviceId).insertTablet(request);
} catch (RedirectException e) {
- handleRedirection(tablet.deviceId, e.getEndPoint());
+ handleRedirection(tablet.deviceId, e.getTEndpoint());
}
}
@@ -1704,9 +1704,9 @@ public class Session {
try {
defaultSessionConnection.insertTablets(request);
} catch (RedirectException e) {
- Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
- for (Map.Entry<String, EndPoint> deviceEndPointEntry :
deviceEndPointMap.entrySet()) {
- handleRedirection(deviceEndPointEntry.getKey(),
deviceEndPointEntry.getValue());
+ Map<String, TEndpoint> deviceTEndpointMap = e.getDeviceTEndpointMap();
+ for (Map.Entry<String, TEndpoint> deviceTEndpointEntry :
deviceTEndpointMap.entrySet()) {
+ handleRedirection(deviceTEndpointEntry.getKey(),
deviceTEndpointEntry.getValue());
}
}
}
@@ -1742,9 +1742,9 @@ public class Session {
try {
defaultSessionConnection.insertTablets(request);
} catch (RedirectException e) {
- Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
- for (Map.Entry<String, EndPoint> deviceEndPointEntry :
deviceEndPointMap.entrySet()) {
- handleRedirection(deviceEndPointEntry.getKey(),
deviceEndPointEntry.getValue());
+ Map<String, TEndpoint> deviceTEndpointMap = e.getDeviceTEndpointMap();
+ for (Map.Entry<String, TEndpoint> deviceTEndpointEntry :
deviceTEndpointMap.entrySet()) {
+ handleRedirection(deviceTEndpointEntry.getKey(),
deviceTEndpointEntry.getValue());
}
}
}
@@ -2455,7 +2455,7 @@ public class Session {
try {
insertConsumer.insert(connection, recordsReq);
} catch (RedirectException e) {
- e.getDeviceEndPointMap()
+ e.getDeviceTEndpointMap()
.forEach(
(deviceId, endpoint) -> {
try {
diff --git
a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 930f5ba..18392f5 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.TEndpoint;
import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
@@ -81,14 +81,14 @@ public class SessionConnection {
private long sessionId;
private long statementId;
private ZoneId zoneId;
- private EndPoint endPoint;
- private List<EndPoint> endPointList = new ArrayList<>();
+ private TEndpoint endPoint;
+ private List<TEndpoint> endPointList = new ArrayList<>();
private boolean enableRedirect = false;
// TestOnly
public SessionConnection() {}
- public SessionConnection(Session session, EndPoint endPoint, ZoneId zoneId)
+ public SessionConnection(Session session, TEndpoint endPoint, ZoneId zoneId)
throws IoTDBConnectionException {
this.session = session;
this.endPoint = endPoint;
@@ -104,7 +104,7 @@ public class SessionConnection {
initClusterConn();
}
- private void init(EndPoint endPoint) throws IoTDBConnectionException {
+ private void init(TEndpoint endPoint) throws IoTDBConnectionException {
RpcTransportFactory.setDefaultBufferCapacity(session.thriftDefaultBufferSize);
RpcTransportFactory.setThriftMaxFrameSize(session.thriftMaxFrameSize);
try {
@@ -160,9 +160,9 @@ public class SessionConnection {
}
private void initClusterConn() throws IoTDBConnectionException {
- for (EndPoint endPoint : endPointList) {
+ for (TEndpoint endPoint : endPointList) {
try {
- session.defaultEndPoint = endPoint;
+ session.defaultTEndpoint = endPoint;
init(endPoint);
} catch (IoTDBConnectionException e) {
if (!reconnect()) {
@@ -783,7 +783,7 @@ public class SessionConnection {
if (tryHostNum == endPointList.size()) {
break;
}
- session.defaultEndPoint = endPointList.get(j);
+ session.defaultTEndpoint = endPointList.get(j);
this.endPoint = endPointList.get(j);
if (j == endPointList.size() - 1) {
j = -1;
@@ -950,11 +950,11 @@ public class SessionConnection {
this.enableRedirect = enableRedirect;
}
- public EndPoint getEndPoint() {
+ public TEndpoint getTEndpoint() {
return endPoint;
}
- public void setEndPoint(EndPoint endPoint) {
+ public void setTEndpoint(TEndpoint endPoint) {
this.endPoint = endPoint;
}
diff --git
a/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
index f2f8a66..9dbee16 100644
--- a/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
+++ b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.session.util;
import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.TEndpoint;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -237,20 +237,20 @@ public class SessionUtils {
}
}
- public static List<EndPoint> parseSeedNodeUrls(List<String> nodeUrls) {
+ public static List<TEndpoint> parseSeedNodeUrls(List<String> nodeUrls) {
if (nodeUrls == null) {
throw new NumberFormatException("nodeUrls is null");
}
- List<EndPoint> endPointsList = new ArrayList<>();
+ List<TEndpoint> endPointsList = new ArrayList<>();
for (String nodeUrl : nodeUrls) {
- EndPoint endPoint = parseNodeUrl(nodeUrl);
+ TEndpoint endPoint = parseNodeUrl(nodeUrl);
endPointsList.add(endPoint);
}
return endPointsList;
}
- private static EndPoint parseNodeUrl(String nodeUrl) {
- EndPoint endPoint = new EndPoint();
+ private static TEndpoint parseNodeUrl(String nodeUrl) {
+ TEndpoint endPoint = new TEndpoint();
String[] split = nodeUrl.split(":");
if (split.length != 2) {
throw new NumberFormatException("NodeUrl Incorrect format");
diff --git
a/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
b/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
index 8cee7c9..007b98f 100644
--- a/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
+++ b/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.session;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.TEndpoint;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -51,20 +51,20 @@ import static org.junit.Assert.fail;
public class SessionCacheLeaderUT {
- private static final List<EndPoint> endpoints =
- new ArrayList<EndPoint>() {
+ private static final List<TEndpoint> endpoints =
+ new ArrayList<TEndpoint>() {
{
- add(new EndPoint("127.0.0.1", 55560)); // default endpoint
- add(new EndPoint("127.0.0.1", 55561)); // meta leader endpoint
- add(new EndPoint("127.0.0.1", 55562));
- add(new EndPoint("127.0.0.1", 55563));
+ add(new TEndpoint("127.0.0.1", 55560)); // default endpoint
+ add(new TEndpoint("127.0.0.1", 55561)); // meta leader endpoint
+ add(new TEndpoint("127.0.0.1", 55562));
+ add(new TEndpoint("127.0.0.1", 55563));
}
};
private Session session;
// just for simulation
- public static EndPoint getDeviceIdBelongedEndpoint(String deviceId) {
+ public static TEndpoint getDeviceIdBelongedEndpoint(String deviceId) {
if (deviceId.startsWith("root.sg1")) {
return endpoints.get(0);
} else if (deviceId.startsWith("root.sg2")) {
@@ -765,7 +765,7 @@ public class SessionCacheLeaderUT {
session.insertRecords(deviceIds, timestamps, measurementsList,
typesList, valuesList);
} catch (IoTDBConnectionException e) {
Assert.assertEquals(
- "the session connection = EndPoint(ip:127.0.0.1, port:55560) is
broken",
+ "the session connection = TEndpoint(ip:127.0.0.1, port:55560) is
broken",
e.getMessage());
}
deviceIds.clear();
@@ -779,7 +779,7 @@ public class SessionCacheLeaderUT {
session.insertRecords(deviceIds, timestamps, measurementsList,
typesList, valuesList);
} catch (IoTDBConnectionException e) {
Assert.assertEquals(
- "the session connection = EndPoint(ip:127.0.0.1, port:55560) is
broken", e.getMessage());
+ "the session connection = TEndpoint(ip:127.0.0.1, port:55560) is
broken", e.getMessage());
}
deviceIds.clear();
measurementsList.clear();
@@ -793,7 +793,7 @@ public class SessionCacheLeaderUT {
session.close();
} catch (IoTDBConnectionException e) {
Assert.assertEquals(
- "the session connection = EndPoint(ip:127.0.0.1, port:55560) is
broken", e.getMessage());
+ "the session connection = TEndpoint(ip:127.0.0.1, port:55560) is
broken", e.getMessage());
}
// with leader cache
@@ -836,20 +836,20 @@ public class SessionCacheLeaderUT {
// set connection as broken, due to we enable the cache leader, when we
called
// ((MockSession) session).getLastConstructedSessionConnection(), the
session's endpoint has
- // been changed to EndPoint(ip:127.0.0.1, port:55562)
+ // been changed to TEndpoint(ip:127.0.0.1, port:55562)
Assert.assertEquals(
- "MockSessionConnection{ endPoint=EndPoint(ip:127.0.0.1, port:55562)}",
+ "MockSessionConnection{ endPoint=TEndpoint(ip:127.0.0.1, port:55562)}",
((MockSession)
session).getLastConstructedSessionConnection().toString());
((MockSession)
session).getLastConstructedSessionConnection().setConnectionBroken(true);
try {
session.insertRecords(deviceIds, timestamps, measurementsList,
typesList, valuesList);
} catch (IoTDBConnectionException e) {
Assert.assertEquals(
- "the session connection = EndPoint(ip:127.0.0.1, port:55562) is
broken", e.getMessage());
+ "the session connection = TEndpoint(ip:127.0.0.1, port:55562) is
broken", e.getMessage());
}
assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertEquals(3, session.deviceIdToEndpoint.size());
- for (Map.Entry<String, EndPoint> endPointMap :
session.deviceIdToEndpoint.entrySet()) {
+ for (Map.Entry<String, TEndpoint> endPointMap :
session.deviceIdToEndpoint.entrySet()) {
assertEquals(getDeviceIdBelongedEndpoint(endPointMap.getKey()),
endPointMap.getValue());
}
assertEquals(3, session.endPointToSessionConnection.size());
@@ -917,7 +917,7 @@ public class SessionCacheLeaderUT {
session.insertTablets(tabletMap, true);
} catch (IoTDBConnectionException e) {
assertEquals(
- "the session connection = EndPoint(ip:127.0.0.1, port:55560) is
broken",
+ "the session connection = TEndpoint(ip:127.0.0.1, port:55560) is
broken",
e.getMessage());
}
tablet1.reset();
@@ -990,9 +990,9 @@ public class SessionCacheLeaderUT {
((MockSession)
session).getLastConstructedSessionConnection().setConnectionBroken(true);
// set connection as broken, due to we enable the cache leader, when we
called
// ((MockSession) session).getLastConstructedSessionConnection(), the
session's endpoint has
- // been changed to EndPoint(ip:127.0.0.1, port:55562)
+ // been changed to TEndpoint(ip:127.0.0.1, port:55562)
Assert.assertEquals(
- "MockSessionConnection{ endPoint=EndPoint(ip:127.0.0.1, port:55562)}",
+ "MockSessionConnection{ endPoint=TEndpoint(ip:127.0.0.1, port:55562)}",
((MockSession)
session).getLastConstructedSessionConnection().toString());
for (long row = 0; row < 10; row++) {
@@ -1024,7 +1024,7 @@ public class SessionCacheLeaderUT {
session.insertTablets(tabletMap, true);
} catch (IoTDBConnectionException e) {
Assert.assertEquals(
- "the session connection = EndPoint(ip:127.0.0.1, port:55562) is
broken", e.getMessage());
+ "the session connection = TEndpoint(ip:127.0.0.1, port:55562) is
broken", e.getMessage());
}
tablet1.reset();
tablet2.reset();
@@ -1032,7 +1032,7 @@ public class SessionCacheLeaderUT {
assertEquals(session.metaSessionConnection,
session.defaultSessionConnection);
assertEquals(2, session.deviceIdToEndpoint.size());
- for (Map.Entry<String, EndPoint> endPointEntry :
session.deviceIdToEndpoint.entrySet()) {
+ for (Map.Entry<String, TEndpoint> endPointEntry :
session.deviceIdToEndpoint.entrySet()) {
assertEquals(getDeviceIdBelongedEndpoint(endPointEntry.getKey()),
endPointEntry.getValue());
}
assertEquals(3, session.endPointToSessionConnection.size());
@@ -1090,7 +1090,7 @@ public class SessionCacheLeaderUT {
@Override
public SessionConnection constructSessionConnection(
- Session session, EndPoint endpoint, ZoneId zoneId) {
+ Session session, TEndpoint endpoint, ZoneId zoneId) {
lastConstructedSessionConnection = new MockSessionConnection(session,
endpoint, zoneId);
return lastConstructedSessionConnection;
}
@@ -1102,11 +1102,11 @@ public class SessionCacheLeaderUT {
static class MockSessionConnection extends SessionConnection {
- private EndPoint endPoint;
+ private TEndpoint endPoint;
private boolean connectionBroken;
private IoTDBConnectionException ioTDBConnectionException;
- public MockSessionConnection(Session session, EndPoint endPoint, ZoneId
zoneId) {
+ public MockSessionConnection(Session session, TEndpoint endPoint, ZoneId
zoneId) {
super();
this.endPoint = endPoint;
ioTDBConnectionException =
@@ -1199,11 +1199,11 @@ public class SessionCacheLeaderUT {
}
private RedirectException getRedirectException(List<String> deviceIds) {
- Map<String, EndPoint> deviceEndPointMap = new HashMap<>();
+ Map<String, TEndpoint> deviceTEndpointMap = new HashMap<>();
for (String deviceId : deviceIds) {
- deviceEndPointMap.put(deviceId, getDeviceIdBelongedEndpoint(deviceId));
+ deviceTEndpointMap.put(deviceId,
getDeviceIdBelongedEndpoint(deviceId));
}
- return new RedirectException(deviceEndPointMap);
+ return new RedirectException(deviceTEndpointMap);
}
public boolean isConnectionBroken() {
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index d58b449..0c90432 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -22,7 +22,7 @@ namespace java org.apache.iotdb.confignode.rpc.thrift
namespace py iotdb.thrift.confignode
struct DataNodeRegisterReq {
- 1: required rpc.EndPoint endPoint
+ 1: required rpc.TEndpoint endPoint
}
struct DataNodeRegisterResp {
@@ -32,7 +32,7 @@ struct DataNodeRegisterResp {
struct DataNodeMessage {
1: required i32 dataNodeID
- 2: required rpc.EndPoint endPoint
+ 2: required rpc.TEndpoint endPoint
}
struct SetStorageGroupReq {
@@ -90,7 +90,7 @@ struct FetchPartitionReq {
struct RegionInfo {
1: required i32 regionId
- 2: required list<rpc.EndPoint> endPointList
+ 2: required list<rpc.TEndpoint> endPointList
}
struct DataPartitionInfoResp {
diff --git a/thrift/src/main/thrift/rpc.thrift
b/thrift/src/main/thrift/rpc.thrift
index ae0086e..487a068 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -19,7 +19,7 @@
namespace java org.apache.iotdb.service.rpc.thrift
namespace py iotdb.thrift.rpc
-struct EndPoint {
+struct TEndpoint {
1: required string ip
2: required i32 port
}
@@ -29,7 +29,7 @@ struct TSStatus {
1: required i32 code
2: optional string message
3: optional list<TSStatus> subStatus
- 4: optional EndPoint redirectNode
+ 4: optional TEndpoint redirectNode
}
struct TSQueryDataSet{