This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/scheduler in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fbaaa4a273f11e7e1eeaec6cc36dfaff4abca161 Author: Jinrui.Zhang <[email protected]> AuthorDate: Tue May 10 23:13:38 2022 +0800 tmp save --- .../mpp/FragmentInstanceDispatchException.java} | 22 +-- .../execution/fragment/FragmentInstanceInfo.java | 11 +- .../org/apache/iotdb/db/mpp/plan/Coordinator.java | 11 ++ .../db/mpp/plan/execution/QueryExecution.java | 6 +- .../db/mpp/plan/scheduler/ClusterScheduler.java | 6 +- .../scheduler/FragmentInstanceDispatcherImpl.java | 191 +++++++++++++++++++++ .../iotdb/db/mpp/plan/plan/QueryPlannerTest.java | 1 + 7 files changed, 227 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java b/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java similarity index 62% copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java copy to server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java index 42d915b42b..8f32997d6f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java @@ -16,25 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.mpp.execution.fragment; -import org.apache.iotdb.consensus.common.DataSet; +package org.apache.iotdb.db.exception.mpp; -public class FragmentInstanceInfo implements DataSet { - private final FragmentInstanceState state; - - private final long endTime; - - public FragmentInstanceInfo(FragmentInstanceState state, long endTime) { - this.state = state; - this.endTime = endTime; - } - - public FragmentInstanceState getState() { - return state; - } - - public long getEndTime() { - return endTime; +public class FragmentInstanceDispatchException extends Exception { + public FragmentInstanceDispatchException(Throwable t) { + super(t); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java index 42d915b42b..585c2845c5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java @@ -22,7 +22,7 @@ import org.apache.iotdb.consensus.common.DataSet; public class FragmentInstanceInfo implements DataSet { private final FragmentInstanceState state; - + private String message; private final long endTime; public FragmentInstanceInfo(FragmentInstanceState state, long endTime) { @@ -30,6 +30,11 @@ public class FragmentInstanceInfo implements DataSet { this.endTime = endTime; } + public FragmentInstanceInfo(FragmentInstanceState state, long endTime, String message) { + this(state, endTime); + this.message = message; + } + public FragmentInstanceState getState() { return state; } @@ -37,4 +42,8 @@ public class FragmentInstanceInfo implements DataSet { public long getEndTime() { return endTime; } + + public String getMessage() { + return message; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java index 23c417862c..c6f6c4a2b1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java @@ -53,6 +53,8 @@ public class Coordinator { private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator"; private static final int COORDINATOR_EXECUTOR_SIZE = 10; + private static final String COORDINATOR_WRITE_EXECUTOR_NAME = "MPPCoordinatorWrite"; + private static final int COORDINATOR_WRITE_EXECUTOR_SIZE = 10; private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled"; private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 1; @@ -73,6 +75,7 @@ public class Coordinator { new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory()); private final ExecutorService executor; + private final ExecutorService writeOperationExecutor; private final ScheduledExecutorService scheduledExecutor; private static final Coordinator INSTANCE = new Coordinator(); @@ -82,6 +85,7 @@ public class Coordinator { private Coordinator() { this.queryExecutionMap = new ConcurrentHashMap<>(); this.executor = getQueryExecutor(); + this.writeOperationExecutor = getWriteExecutor(); this.scheduledExecutor = getScheduledExecutor(); } @@ -98,6 +102,7 @@ public class Coordinator { statement, queryContext, executor, + writeOperationExecutor, scheduledExecutor, partitionFetcher, schemaFetcher, @@ -138,6 +143,12 @@ public class Coordinator { return IoTDBThreadPoolFactory.newFixedThreadPool( COORDINATOR_EXECUTOR_SIZE, COORDINATOR_EXECUTOR_NAME); } + + private ExecutorService getWriteExecutor() { + return IoTDBThreadPoolFactory.newFixedThreadPool( + COORDINATOR_WRITE_EXECUTOR_SIZE, COORDINATOR_WRITE_EXECUTOR_NAME); + } + // TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management private ScheduledExecutorService getScheduledExecutor() { return IoTDBThreadPoolFactory.newScheduledThreadPool( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java index 9f4b2b9d9d..0e5da23ad1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java @@ -86,10 +86,11 @@ public class QueryExecution implements IQueryExecution { private DistributedQueryPlan distributedPlan; private final ExecutorService executor; + private final ExecutorService writeOperationExecutor; private final ScheduledExecutorService scheduledExecutor; // TODO need to use factory to decide standalone or cluster private final IPartitionFetcher partitionFetcher; - // TODO need to use factory to decide standalone or cluster + // TODO need to use factory to decide standalone or cluster, private final ISchemaFetcher schemaFetcher; // The result of QueryExecution will be written to the DataBlockManager in current Node. @@ -103,11 +104,13 @@ public class QueryExecution implements IQueryExecution { Statement statement, MPPQueryContext context, ExecutorService executor, + ExecutorService writeOperationExecutor, ScheduledExecutorService scheduledExecutor, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher, IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) { this.executor = executor; + this.writeOperationExecutor = writeOperationExecutor; this.scheduledExecutor = scheduledExecutor; this.context = context; this.planOptimizers = new ArrayList<>(); @@ -176,6 +179,7 @@ public class QueryExecution implements IQueryExecution { distributedPlan.getInstances(), context.getQueryType(), executor, + writeOperationExecutor, scheduledExecutor, internalServiceClientManager) : new StandaloneScheduler( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java index ced23a70ac..dd2ca46271 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java @@ -58,6 +58,7 @@ public class ClusterScheduler implements IScheduler { private List<FragmentInstance> instances; private ExecutorService executor; + private ExecutorService writeOperationExecutor; private ScheduledExecutorService scheduledExecutor; private IFragInstanceDispatcher dispatcher; @@ -70,6 +71,7 @@ public class ClusterScheduler implements IScheduler { List<FragmentInstance> instances, QueryType queryType, ExecutorService executor, + ExecutorService writeOperationExecutor, ScheduledExecutorService scheduledExecutor, IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) { this.queryContext = queryContext; @@ -78,7 +80,9 @@ public class ClusterScheduler implements IScheduler { this.queryType = queryType; this.executor = executor; this.scheduledExecutor = scheduledExecutor; - this.dispatcher = new SimpleFragInstanceDispatcher(executor, internalServiceClientManager); + this.dispatcher = + new FragmentInstanceDispatcherImpl( + queryType, executor, writeOperationExecutor, internalServiceClientManager); this.stateTracker = new FixedRateFragInsStateTracker( stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java new file mode 100644 index 0000000000..8922de59e3 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.plan.scheduler; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; +import org.apache.iotdb.commons.consensus.ConsensusGroupId; +import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.consensus.ConsensusImpl; +import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException; +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo; +import org.apache.iotdb.db.mpp.plan.analyze.QueryType; +import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator; +import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode; +import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance; +import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq; +import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp; +import org.apache.iotdb.rpc.TSStatusCode; + +import com.google.common.util.concurrent.SettableFuture; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { + + private static final Logger logger = + LoggerFactory.getLogger(FragmentInstanceDispatcherImpl.class); + private final ExecutorService executor; + private final ExecutorService writeOperationExecutor; + private final QueryType type; + private final String localhostIpAddr; + private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> + internalServiceClientManager; + + public FragmentInstanceDispatcherImpl( + QueryType type, + ExecutorService executor, + ExecutorService writeOperationExecutor, + IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) { + this.type = type; + this.executor = executor; + this.writeOperationExecutor = writeOperationExecutor; + this.internalServiceClientManager = internalServiceClientManager; + this.localhostIpAddr = IoTDBDescriptor.getInstance().getConfig().getInternalIp(); + } + + @Override + public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances) { + if (type == QueryType.READ) { + return dispatchRead(instances); + } else { + return dispatchWrite(instances); + } + } + + // TODO: (xingtanzjr) currently we use a sequential dispatch policy for READ, which is + // unsafe for current FragmentInstance scheduler framework. We need to implement the + // topological dispatch according to dependency relations between FragmentInstances + private Future<FragInstanceDispatchResult> dispatchRead(List<FragmentInstance> instances) { + return executor.submit( + () -> { + for (FragmentInstance instance : instances) { + boolean accepted = dispatchOneInstance(instance); + if (!accepted) { + return new FragInstanceDispatchResult(false); + } + } + return new FragInstanceDispatchResult(true); + }); + } + + // TODO: (xingtanzjr) Return the detailed write states for each FragmentInstance + private Future<FragInstanceDispatchResult> dispatchWrite(List<FragmentInstance> instances) { + List<Future<Boolean>> futures = new LinkedList<>(); + for (FragmentInstance instance : instances) { + futures.add(writeOperationExecutor.submit(() -> dispatchOneInstance(instance))); + } + SettableFuture<FragInstanceDispatchResult> resultFuture = SettableFuture.create(); + for (Future<Boolean> future : futures) { + try { + Boolean success = future.get(); + if (!success) { + resultFuture.set(new FragInstanceDispatchResult(false)); + break; + } + } catch (ExecutionException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + resultFuture.setException(e); + break; + } + } + resultFuture.set(new FragInstanceDispatchResult(true)); + return resultFuture; + } + + private boolean dispatchOneInstance(FragmentInstance instance) + throws FragmentInstanceDispatchException { + TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint(); + if (isDispatchedToLocal(endPoint)) { + return dispatchLocally(instance); + } else { + return dispatchRemote(instance, endPoint); + } + } + + private boolean isDispatchedToLocal(TEndPoint endPoint) { + return this.localhostIpAddr.equals(endPoint.getIp()); + } + + private boolean dispatchRemote(FragmentInstance instance, TEndPoint endPoint) + throws FragmentInstanceDispatchException { + try (SyncDataNodeInternalServiceClient client = + internalServiceClientManager.borrowClient(endPoint)) { + ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024); + instance.serializeRequest(buffer); + buffer.flip(); + TConsensusGroupId groupId = instance.getRegionReplicaSet().getRegionId(); + TSendFragmentInstanceReq req = + new TSendFragmentInstanceReq( + new TFragmentInstance(buffer), groupId, instance.getType().toString()); + TSendFragmentInstanceResp resp = client.sendFragmentInstance(req); + return resp.accepted; + } catch (IOException | TException e) { + logger.error("can't connect to node {}", endPoint, e); + throw new FragmentInstanceDispatchException(e); + } + } + + private boolean dispatchLocally(FragmentInstance instance) + throws FragmentInstanceDispatchException { + ConsensusGroupId groupId = + ConsensusGroupId.Factory.createFromTConsensusGroupId( + instance.getRegionReplicaSet().getRegionId()); + switch (instance.getType()) { + case READ: + FragmentInstanceInfo info = + (FragmentInstanceInfo) ConsensusImpl.getInstance().read(groupId, instance).getDataset(); + return !info.getState().isFailed(); + case WRITE: + PlanNode planNode = instance.getFragment().getRoot(); + if (planNode instanceof InsertNode) { + try { + SchemaValidator.validate((InsertNode) planNode); + } catch (SemanticException e) { + throw new FragmentInstanceDispatchException(e); + } + } + ConsensusWriteResponse resp = ConsensusImpl.getInstance().write(groupId, instance); + return TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode(); + } + throw new UnsupportedOperationException( + String.format("unknown query type [%s]", instance.getType())); + } + + @Override + public void abort() {} +} diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java index c6694843a0..48b4137bb2 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java @@ -78,6 +78,7 @@ public class QueryPlannerTest { new TEndPoint(), new TEndPoint()), IoTDBThreadPoolFactory.newSingleThreadExecutor("test_query"), + IoTDBThreadPoolFactory.newSingleThreadExecutor("test_write_operation"), IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"), new FakePartitionFetcherImpl(), new FakeSchemaFetcherImpl(),
