This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/weak_read in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bd5006aabaa722cb01c36a305b88f975f60440a7 Author: Jinrui.Zhang <[email protected]> AuthorDate: Fri Jul 15 12:27:53 2022 +0800 add weak read consistency policy for MPP --- .../iotdb/db/mpp/common/MPPQueryContext.java | 4 +++ .../apache/iotdb/db/mpp/common/SessionInfo.java | 25 +++++++++++++++--- .../SimpleFragmentParallelPlanner.java | 28 ++++++++++++++++++++ .../db/mpp/plan/planner/plan/FragmentInstance.java | 6 +++++ .../iotdb/db/query/control/SessionManager.java | 1 + .../iotdb/db/mpp/plan/plan/QueryPlannerTest.java | 2 +- .../plan/scheduler/StandaloneSchedulerTest.java | 30 ++++++++++++++++++---- 7 files changed, 86 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java index de2cf66aba..153b1e8598 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java @@ -76,4 +76,8 @@ public class MPPQueryContext { public TEndPoint getLocalInternalEndpoint() { return localInternalEndpoint; } + + public SessionInfo getSession() { + return session; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java index ad6426724b..ff53da75d4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java @@ -18,9 +18,26 @@ */ package org.apache.iotdb.db.mpp.common; -import java.time.ZoneId; - public class SessionInfo { - private String userName; - private ZoneId zoneId; + private final long sessionId; + private final String userName; + private final String zoneId; + + public SessionInfo(long sessionId, String userName, String zoneId) { + this.sessionId = sessionId; + this.userName = userName; + this.zoneId = zoneId; + } + + public long getSessionId() { + return sessionId; + } + + public String getUserName() { + return userName; + } + + public String getZoneId() { + return zoneId; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java index 50fd5d046a..688d49a702 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java @@ -18,7 +18,9 @@ */ package org.apache.iotdb.db.mpp.plan.planner.distribution; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.mpp.common.MPPQueryContext; import org.apache.iotdb.db.mpp.common.PlanFragmentId; import org.apache.iotdb.db.mpp.plan.analyze.Analysis; @@ -100,11 +102,37 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner { // redirected // to another host when scheduling fragmentInstance.setDataRegionAndHost(regionReplicaSet); + fragmentInstance.setHostDataNode(selectTargetDataNode(regionReplicaSet)); + fragmentInstance.getFragment().setTypeProvider(analysis.getTypeProvider()); instanceMap.putIfAbsent(fragment.getId(), fragmentInstance); fragmentInstanceList.add(fragmentInstance); } + private TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplicaSet) { + if (regionReplicaSet == null + || regionReplicaSet.getDataNodeLocations() == null + || regionReplicaSet.getDataNodeLocations().size() == 0) { + throw new IllegalArgumentException( + String.format("regionReplicaSet is invalid: %s", regionReplicaSet)); + } + String readConsistencyLevel = + IoTDBDescriptor.getInstance().getConfig().getReadConsistencyLevel(); + // TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel as static variable or + // enums + boolean selectRandomDataNode = "weak".equals(readConsistencyLevel); + int targetIndex; + if (!selectRandomDataNode) { + targetIndex = 0; + } else { + targetIndex = + (int) + (queryContext.getSession().getSessionId() + % regionReplicaSet.getDataNodeLocationsSize()); + } + return regionReplicaSet.getDataNodeLocations().get(targetIndex); + } + private void calculateNodeTopologyBetweenInstance() { for (FragmentInstance instance : fragmentInstanceList) { PlanNode rootNode = instance.getFragment().getRoot(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java index 47e785a58a..f617ff9f02 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java @@ -87,6 +87,12 @@ public class FragmentInstance implements IConsensusRequest { } } + // Although the HostDataNode is set in method setDataRegionAndHost(), + // we still keep another method for customized needs + public void setHostDataNode(TDataNodeLocation hostDataNode) { + this.hostDataNode = hostDataNode; + } + public TRegionReplicaSet getRegionReplicaSet() { return regionReplicaSet; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java index ffe0afd7b5..0a0584616d 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java @@ -229,6 +229,7 @@ public class SessionManager { sessionIdToUsername.put(sessionId, username); sessionIdToZoneId.put(sessionId, ZoneId.of(zoneId)); sessionIdToClientVersion.put(sessionId, clientVersion); + sessionIdToSessionInfo.put(sessionId, new SessionInfo(sessionId, username, zoneId)); return sessionId; } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java index 48b4137bb2..de492a843b 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java @@ -74,7 +74,7 @@ public class QueryPlannerTest { new MPPQueryContext( querySql, new QueryId("query1"), - new SessionInfo(), + new SessionInfo(1L, "fakeUsername", "fakeZoneId"), new TEndPoint(), new TEndPoint()), IoTDBThreadPoolFactory.newSingleThreadExecutor("test_query"), diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java index 6a19ade539..7a7d562b1c 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java @@ -140,7 +140,11 @@ public class StandaloneSchedulerTest { configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.wf01.wt01.status")); MPPQueryContext context = new MPPQueryContext( - "", new QueryId("query1"), new SessionInfo(), new TEndPoint(), new TEndPoint()); + "", + new QueryId("query1"), + new SessionInfo(1L, "fakeUsername", "fakeZoneId"), + new TEndPoint(), + new TEndPoint()); ExecutorService executor = IoTDBThreadPoolFactory.newSingleThreadExecutor("Test"); QueryStateMachine stateMachine = new QueryStateMachine(context.getQueryId(), executor); @@ -239,7 +243,11 @@ public class StandaloneSchedulerTest { configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.wf01.GPS")); MPPQueryContext context = new MPPQueryContext( - "", new QueryId("query1"), new SessionInfo(), new TEndPoint(), new TEndPoint()); + "", + new QueryId("query1"), + new SessionInfo(1L, "fakeUsername", "fakeZoneId"), + new TEndPoint(), + new TEndPoint()); ExecutorService executor = IoTDBThreadPoolFactory.newSingleThreadExecutor("Test"); QueryStateMachine stateMachine = new QueryStateMachine(context.getQueryId(), executor); @@ -348,7 +356,11 @@ public class StandaloneSchedulerTest { configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.d3")); MPPQueryContext context = new MPPQueryContext( - "", new QueryId("query1"), new SessionInfo(), new TEndPoint(), new TEndPoint()); + "", + new QueryId("query1"), + new SessionInfo(1L, "fakeUsername", "fakeZoneId"), + new TEndPoint(), + new TEndPoint()); ExecutorService executor = IoTDBThreadPoolFactory.newSingleThreadExecutor("Test"); QueryStateMachine stateMachine = new QueryStateMachine(context.getQueryId(), executor); @@ -396,7 +408,11 @@ public class StandaloneSchedulerTest { configNode.getBelongedDataRegionIdWithAutoCreate(new PartialPath(deviceId)); MPPQueryContext context = new MPPQueryContext( - "", new QueryId("query1"), new SessionInfo(), new TEndPoint(), new TEndPoint()); + "", + new QueryId("query1"), + new SessionInfo(1L, "fakeUsername", "fakeZoneId"), + new TEndPoint(), + new TEndPoint()); ExecutorService executor = IoTDBThreadPoolFactory.newSingleThreadExecutor("Test"); QueryStateMachine stateMachine = new QueryStateMachine(context.getQueryId(), executor); @@ -473,7 +489,11 @@ public class StandaloneSchedulerTest { configNode.getBelongedDataRegionIdWithAutoCreate(deviceId); MPPQueryContext context = new MPPQueryContext( - "", new QueryId("query1"), new SessionInfo(), new TEndPoint(), new TEndPoint()); + "", + new QueryId("query1"), + new SessionInfo(1L, "fakeUsername", "fakeZoneId"), + new TEndPoint(), + new TEndPoint()); ExecutorService executor = IoTDBThreadPoolFactory.newSingleThreadExecutor("Test"); QueryStateMachine stateMachine = new QueryStateMachine(context.getQueryId(), executor);
