This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-task-schedule in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 415a7ce4f0c030f6cd0a4a476ba27e629b7747e1 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon May 22 22:39:02 2023 +0800 pipe task scheduler --- .../execution/executor/PipeSubtaskExecutor.java | 24 ++++-- .../execution/scheduler/PipeSubtaskScheduler.java | 89 ++++++++++++++++++++++ .../execution/scheduler/PipeTaskScheduler.java | 74 ------------------ .../iotdb/db/pipe/task/subtask/PipeSubtask.java | 26 ++++++- .../executor/PipeConnectorSubtaskExecutorTest.java | 3 +- .../executor/PipeProcessorSubtaskExecutorTest.java | 3 +- .../executor/PipeSubtaskExecutorTest.java | 3 +- 7 files changed, 133 insertions(+), 89 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java index 762561546b3..d1befd3e08a 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.execution.executor; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler; import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask; import com.google.common.util.concurrent.ListeningExecutorService; @@ -44,7 +45,8 @@ public abstract class PipeSubtaskExecutor { private final Map<String, PipeSubtask> registeredIdSubtaskMapper; - private int corePoolSize; + private final int corePoolSize; + private int runningSubtaskNumber; protected PipeSubtaskExecutor(int corePoolSize, ThreadName threadName) { subtaskWorkerThreadPoolExecutor = @@ -54,6 +56,7 @@ public abstract class PipeSubtaskExecutor { registeredIdSubtaskMapper = new ConcurrentHashMap<>(); this.corePoolSize = corePoolSize; + runningSubtaskNumber = 0; } /////////////////////// subtask management /////////////////////// @@ -65,7 +68,10 @@ public abstract class PipeSubtaskExecutor { } registeredIdSubtaskMapper.put(subtask.getTaskID(), subtask); - subtask.bindExecutors(subtaskWorkerThreadPoolExecutor, subtaskCallbackListeningExecutor); + subtask.bindExecutors( + subtaskWorkerThreadPoolExecutor, + subtaskCallbackListeningExecutor, + new PipeSubtaskScheduler(this)); } public final synchronized void start(String subTaskID) { @@ -82,6 +88,7 @@ public abstract class PipeSubtaskExecutor { } else { subtask.allowSubmittingSelf(); subtask.submitSelf(); + ++runningSubtaskNumber; LOGGER.info("The subtask {} is started to submit self.", subTaskID); } } @@ -92,7 +99,9 @@ public abstract class PipeSubtaskExecutor { return; } - registeredIdSubtaskMapper.get(subTaskID).disallowSubmittingSelf(); + if (registeredIdSubtaskMapper.get(subTaskID).disallowSubmittingSelf()) { + --runningSubtaskNumber; + } } public final synchronized void deregister(String subTaskID) { @@ -138,12 +147,11 @@ public abstract class PipeSubtaskExecutor { return subtaskWorkerThreadPoolExecutor.isShutdown(); } - public final void adjustExecutorThreadNumber(int threadNum) { - corePoolSize = threadNum; - throw new UnsupportedOperationException("Not implemented yet."); + public final int getCorePoolSize() { + return corePoolSize; } - public final int getExecutorThreadNumber() { - return corePoolSize; + public final int getRunningSubtaskNumber() { + return runningSubtaskNumber; } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java new file mode 100644 index 00000000000..dada354e743 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.execution.scheduler; + +import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor; + +public class PipeSubtaskScheduler { + + private final PipeSubtaskExecutor executor; + + private boolean isFirstSchedule = true; + + // TODO: make these two configurable + + private static final int BASIC_CHECKPOINT_INTERVAL_BY_CONSUMED_EVENT_COUNT = 10_000; + private int consumedEventCountCheckpointInterval; + private int consumedEventCount; + + // in ms + private static final long BASIC_CHECKPOINT_INTERVAL_BY_TIME_DURATION = 10 * 1000L; + private long timeDurationCheckpointInterval; + private long lastCheckTime; + + public PipeSubtaskScheduler(PipeSubtaskExecutor executor) { + this.executor = executor; + } + + public boolean schedule() { + if (isFirstSchedule) { + isFirstSchedule = false; + + adjustCheckpointIntervalBasedOnExecutorStatus(); + + ++consumedEventCount; + return true; + } + + if (consumedEventCount < consumedEventCountCheckpointInterval + && System.currentTimeMillis() - lastCheckTime < timeDurationCheckpointInterval) { + ++consumedEventCount; + return true; + } + + return false; + } + + private void adjustCheckpointIntervalBasedOnExecutorStatus() { + // 1. reset consumedEventCount and lastCheckTime + consumedEventCount = 0; + lastCheckTime = System.currentTimeMillis(); + + // 2. adjust checkpoint interval + final int corePoolSize = Math.max(1, executor.getCorePoolSize()); + final int runningSubtaskNumber = Math.max(1, executor.getRunningSubtaskNumber()); + consumedEventCountCheckpointInterval = + Math.max( + 1, + (int) + (((float) BASIC_CHECKPOINT_INTERVAL_BY_CONSUMED_EVENT_COUNT / runningSubtaskNumber) + * corePoolSize)); + timeDurationCheckpointInterval = + Math.max( + 1, + (long) + (((float) BASIC_CHECKPOINT_INTERVAL_BY_TIME_DURATION / runningSubtaskNumber) + * corePoolSize)); + } + + public void reset() { + isFirstSchedule = true; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java deleted file mode 100644 index 4f035ca6715..00000000000 --- a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeTaskScheduler.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.pipe.execution.scheduler; - -import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager; - -/** - * PipeTaskScheduler is a singleton class that manages the numbers of threads used by - * PipeTaskExecutors dynamically. - */ -public class PipeTaskScheduler { - - private final PipeSubtaskExecutorManager pipeSubtaskExecutorManager = - PipeSubtaskExecutorManager.getInstance(); - - public void adjustAssignerSubtaskExecutorThreadNum(int threadNum) { - // TODO: make it configurable by setting different parameters - pipeSubtaskExecutorManager.getAssignerSubtaskExecutor().adjustExecutorThreadNumber(threadNum); - } - - public int getAssignerSubtaskExecutorThreadNum() { - return pipeSubtaskExecutorManager.getAssignerSubtaskExecutor().getExecutorThreadNumber(); - } - - public void adjustConnectorSubtaskExecutorThreadNum(int threadNum) { - // TODO: make it configurable by setting different parameters - pipeSubtaskExecutorManager.getConnectorSubtaskExecutor().adjustExecutorThreadNumber(threadNum); - } - - public int getConnectorSubtaskExecutorThreadNum() { - return pipeSubtaskExecutorManager.getConnectorSubtaskExecutor().getExecutorThreadNumber(); - } - - public void adjustProcessorSubtaskExecutorThreadNum(int threadNum) { - // TODO: make it configurable by setting different parameters - pipeSubtaskExecutorManager.getProcessorSubtaskExecutor().adjustExecutorThreadNumber(threadNum); - } - - public int getProcessorSubtaskExecutorThreadNum() { - return pipeSubtaskExecutorManager.getProcessorSubtaskExecutor().getExecutorThreadNumber(); - } - - ///////////////////////// Singleton Instance Holder ///////////////////////// - - private PipeTaskScheduler() {} - - private static class PipeTaskSchedulerHolder { - private static PipeTaskScheduler instance = null; - } - - public static PipeTaskScheduler setupAndGetInstance() { - if (PipeTaskSchedulerHolder.instance == null) { - PipeTaskSchedulerHolder.instance = new PipeTaskScheduler(); - } - return PipeTaskSchedulerHolder.instance; - } -} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java index bed57c90e88..5ddff7b2f29 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.task.subtask; import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.core.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler; import org.apache.iotdb.pipe.api.event.Event; import com.google.common.util.concurrent.FutureCallback; @@ -47,6 +48,8 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void private final DecoratingLock callbackDecoratingLock = new DecoratingLock(); private final AtomicBoolean shouldStopSubmittingSelf = new AtomicBoolean(true); + private PipeSubtaskScheduler subtaskScheduler; + protected static final int MAX_RETRY_TIMES = 5; private final AtomicInteger retryCount = new AtomicInteger(0); protected Throwable lastFailedCause; @@ -60,14 +63,25 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void public void bindExecutors( ListeningExecutorService subtaskWorkerThreadPoolExecutor, - ExecutorService subtaskCallbackListeningExecutor) { + ExecutorService subtaskCallbackListeningExecutor, + PipeSubtaskScheduler subtaskScheduler) { this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor; this.subtaskCallbackListeningExecutor = subtaskCallbackListeningExecutor; + this.subtaskScheduler = subtaskScheduler; } @Override public Void call() throws Exception { - executeOnce(); + // if the scheduler allows to schedule, then try to consume an event + while (subtaskScheduler.schedule()) { + // if the event is consumed successfully, then continue to consume the next event + // otherwise, stop consuming + if (!executeOnce()) { + break; + } + } + // reset the scheduler to make sure that the scheduler can schedule again + subtaskScheduler.reset(); // wait for the callable to be decorated by Futures.addCallback in the executorService // to make sure that the callback can be submitted again on success or failure. @@ -131,8 +145,12 @@ public abstract class PipeSubtask implements FutureCallback<Void>, Callable<Void shouldStopSubmittingSelf.set(false); } - public void disallowSubmittingSelf() { - shouldStopSubmittingSelf.set(true); + /** + * @return true if the shouldStopSubmittingSelf state is changed from false to true, false + * otherwise + */ + public boolean disallowSubmittingSelf() { + return !shouldStopSubmittingSelf.getAndSet(true); } public boolean isSubmittingSelf() { diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeConnectorSubtaskExecutorTest.java similarity index 92% rename from server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java rename to server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeConnectorSubtaskExecutorTest.java index 52acbe9c2ff..4c572438955 100644 --- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeConnectorSubtaskExecutorTest.java @@ -17,8 +17,9 @@ * under the License. */ -package org.apache.iotdb.db.pipe.execution.executor; +package org.apache.iotdb.db.pipe.executor; +import org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor; import org.apache.iotdb.db.pipe.task.queue.ListenableBlockingPendingQueue; import org.apache.iotdb.db.pipe.task.subtask.PipeConnectorSubtask; import org.apache.iotdb.pipe.api.PipeConnector; diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeProcessorSubtaskExecutorTest.java similarity index 92% rename from server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java rename to server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeProcessorSubtaskExecutorTest.java index d0a5208d537..a8a8659f7fa 100644 --- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeProcessorSubtaskExecutorTest.java @@ -17,8 +17,9 @@ * under the License. */ -package org.apache.iotdb.db.pipe.execution.executor; +package org.apache.iotdb.db.pipe.executor; +import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor; import org.apache.iotdb.db.pipe.task.queue.EventSupplier; import org.apache.iotdb.db.pipe.task.subtask.PipeProcessorSubtask; import org.apache.iotdb.pipe.api.PipeProcessor; diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeSubtaskExecutorTest.java similarity index 97% rename from server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorTest.java rename to server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeSubtaskExecutorTest.java index d24efc3fb7f..f70d6874211 100644 --- a/server/src/test/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/pipe/executor/PipeSubtaskExecutorTest.java @@ -17,8 +17,9 @@ * under the License. */ -package org.apache.iotdb.db.pipe.execution.executor; +package org.apache.iotdb.db.pipe.executor; +import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor; import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask; import org.junit.After;
