This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/FixIntoOperator1.0 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 80fa61e518eb8f71a2085a9b3559ee45b0abd303 Author: Minghui Liu <[email protected]> AuthorDate: Wed Nov 30 19:02:03 2022 +0800 finish --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 15 +++++++ .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 9 +++++ .../fragment/FragmentInstanceContext.java | 22 +++++++++-- .../fragment/FragmentInstanceManager.java | 16 +++++++- .../operator/process/AbstractIntoOperator.java | 46 ++++++++++++++-------- .../operator/process/DeviceViewIntoOperator.java | 8 ++-- .../execution/operator/process/IntoOperator.java | 9 +++-- .../plan/planner/LocalExecutionPlanContext.java | 5 +++ .../db/mpp/plan/planner/OperatorTreeGenerator.java | 6 ++- 9 files changed, 106 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index c3c2da9f0d..92abf21537 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -685,6 +685,13 @@ public class IoTDBConfig { */ private int selectIntoInsertTabletPlanRowLimit = 10000; + /** + * How many thread will be set up to execute into operation. When <= 0, use max(1, CPU core number + * / 2). + */ + private int intoOperationSubmitThreadCount = + Math.max(1, Runtime.getRuntime().availableProcessors() / 2); + /** Default TSfile storage is in local file system */ private FSType tsFileStorageFs = FSType.LOCAL; @@ -1902,6 +1909,14 @@ public class IoTDBConfig { return selectIntoInsertTabletPlanRowLimit; } + public int getIntoOperationSubmitThreadCount() { + return intoOperationSubmitThreadCount; + } + + public void setIntoOperationSubmitThreadCount(int intoOperationSubmitThreadCount) { + this.intoOperationSubmitThreadCount = intoOperationSubmitThreadCount; + } + public int getCompactionWriteThroughputMbPerSec() { return compactionWriteThroughputMbPerSec; } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index bba0dd92c5..9bab834045 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -903,6 +903,15 @@ public class IoTDBDescriptor { properties.getProperty( "select_into_insert_tablet_plan_row_limit", String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit())))); + conf.setIntoOperationSubmitThreadCount( + Integer.parseInt( + properties.getProperty( + "into_operation_submit_thread_count", + String.valueOf(conf.getIntoOperationSubmitThreadCount())))); + if (conf.getIntoOperationSubmitThreadCount() <= 0) { + conf.setIntoOperationSubmitThreadCount( + Math.max(1, Runtime.getRuntime().availableProcessors() / 2)); + } conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", conf.getExtPipeDir()).trim()); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java index 69928dcec3..5a4ede9538 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java @@ -33,6 +33,7 @@ import java.time.ZoneId; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -65,6 +66,8 @@ public class FragmentInstanceContext extends QueryContext { // session info private SessionInfo sessionInfo; + private ExecutorService intoOperationExecutor; + // private final GcMonitor gcMonitor; // private final AtomicLong startNanos = new AtomicLong(); // private final AtomicLong startFullGcCount = new AtomicLong(-1); @@ -74,9 +77,12 @@ public class FragmentInstanceContext extends QueryContext { // private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1); public static FragmentInstanceContext createFragmentInstanceContext( - FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) { + FragmentInstanceId id, + FragmentInstanceStateMachine stateMachine, + SessionInfo sessionInfo, + ExecutorService intoOperationExecutor) { FragmentInstanceContext instanceContext = - new FragmentInstanceContext(id, stateMachine, sessionInfo); + new FragmentInstanceContext(id, stateMachine, sessionInfo, intoOperationExecutor); instanceContext.initialize(); instanceContext.start(); return instanceContext; @@ -87,11 +93,15 @@ public class FragmentInstanceContext extends QueryContext { } private FragmentInstanceContext( - FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) { + FragmentInstanceId id, + FragmentInstanceStateMachine stateMachine, + SessionInfo sessionInfo, + ExecutorService intoOperationExecutor) { this.id = id; this.stateMachine = stateMachine; this.executionEndTime.set(END_TIME_INITIAL_VALUE); this.sessionInfo = sessionInfo; + this.intoOperationExecutor = intoOperationExecutor; } @TestOnly @@ -99,7 +109,7 @@ public class FragmentInstanceContext extends QueryContext { FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) { FragmentInstanceContext instanceContext = new FragmentInstanceContext( - id, stateMachine, new SessionInfo(1, "test", ZoneId.systemDefault().getId())); + id, stateMachine, new SessionInfo(1, "test", ZoneId.systemDefault().getId()), null); instanceContext.initialize(); instanceContext.start(); return instanceContext; @@ -238,4 +248,8 @@ public class FragmentInstanceContext extends QueryContext { public Optional<Throwable> getFailureCause() { return Optional.ofNullable(stateMachine.getFailureCauses().peek()); } + + public ExecutorService getIntoOperationExecutor() { + return intoOperationExecutor; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java index db653ff690..5114d2dcbb 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java @@ -68,6 +68,8 @@ public class FragmentInstanceManager { private static final long QUERY_TIMEOUT_MS = IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(); + private ExecutorService intoOperationExecutor; + public static FragmentInstanceManager getInstance() { return FragmentInstanceManager.InstanceHolder.INSTANCE; } @@ -79,6 +81,10 @@ public class FragmentInstanceManager { IoTDBThreadPoolFactory.newScheduledThreadPool(1, "instance-management"); this.instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification"); + this.intoOperationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool( + IoTDBDescriptor.getInstance().getConfig().getIntoOperationSubmitThreadCount(), + "into-operation-executor"); this.infoCacheTime = new Duration(5, TimeUnit.MINUTES); @@ -109,7 +115,10 @@ public class FragmentInstanceManager { instanceId, fragmentInstanceId -> createFragmentInstanceContext( - fragmentInstanceId, stateMachine, instance.getSessionInfo())); + fragmentInstanceId, + stateMachine, + instance.getSessionInfo(), + intoOperationExecutor)); try { DataDriver driver = @@ -165,7 +174,10 @@ public class FragmentInstanceManager { instanceId, fragmentInstanceId -> createFragmentInstanceContext( - fragmentInstanceId, stateMachine, instance.getSessionInfo())); + fragmentInstanceId, + stateMachine, + instance.getSessionInfo(), + intoOperationExecutor)); try { SchemaDriver driver = diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java index 6181dab228..7d41985ece 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java @@ -37,10 +37,8 @@ import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.BitMap; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import static com.google.common.util.concurrent.Futures.successfulAsList; @@ -71,21 +69,20 @@ public abstract class AbstractIntoOperator implements ProcessOperator { private DataNodeInternalClient client; - private ListenableFuture<?> isBlocked = NOT_BLOCKED; - - private final ListeningExecutorService writeOperationExecutor = - MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); + private final ExecutorService writeOperationExecutor; private ListenableFuture<TSStatus> writeOperationFuture; public AbstractIntoOperator( OperatorContext operatorContext, Operator child, List<InsertTabletStatementGenerator> insertTabletStatementGenerators, - Map<String, InputLocation> sourceColumnToInputLocationMap) { + Map<String, InputLocation> sourceColumnToInputLocationMap, + ExecutorService intoOperationExecutor) { this.operatorContext = operatorContext; this.child = child; this.insertTabletStatementGenerators = insertTabletStatementGenerators; this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap; + this.writeOperationExecutor = intoOperationExecutor; } protected static List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators( @@ -144,14 +141,12 @@ public abstract class AbstractIntoOperator implements ProcessOperator { client = new DataNodeInternalClient(operatorContext.getSessionInfo()); } - isBlocked = SettableFuture.create(); writeOperationFuture = - writeOperationExecutor.submit(() -> client.insertTablets(insertMultiTabletsStatement)); - writeOperationFuture.addListener( - () -> ((SettableFuture<Void>) isBlocked).set(null), writeOperationExecutor); + Futures.submit( + () -> client.insertTablets(insertMultiTabletsStatement), writeOperationExecutor); } - protected boolean writeOperationDone() { + protected boolean handleWriteOperationFuture() { if (writeOperationFuture == null) { return true; } @@ -220,9 +215,26 @@ public abstract class AbstractIntoOperator implements ProcessOperator { return operatorContext; } + private boolean writeOperationDone() { + if (writeOperationFuture == null) { + return true; + } + + return writeOperationFuture.isDone(); + } + @Override public ListenableFuture<?> isBlocked() { - return successfulAsList(Arrays.asList(isBlocked, child.isBlocked())); + ListenableFuture<?> childBlocked = child.isBlocked(); + if (writeOperationDone() && childBlocked.isDone()) { + return NOT_BLOCKED; + } else if (!writeOperationDone() && childBlocked.isDone()) { + return writeOperationFuture; + } else if (writeOperationDone() && !childBlocked.isDone()) { + return childBlocked; + } else { + return successfulAsList(Arrays.asList(writeOperationFuture, childBlocked)); + } } @Override @@ -237,7 +249,9 @@ public abstract class AbstractIntoOperator implements ProcessOperator { if (client != null) { client.close(); } - writeOperationExecutor.shutdown(); + if (writeOperationFuture != null) { + writeOperationFuture.cancel(true); + } child.close(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java index e36bb49168..82214a562b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java @@ -37,6 +37,7 @@ import org.apache.iotdb.tsfile.utils.Pair; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; public class DeviceViewIntoOperator extends AbstractIntoOperator { @@ -60,8 +61,9 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator { Map<String, Map<PartialPath, Map<String, TSDataType>>> deviceToTargetPathDataTypeMap, Map<String, Boolean> targetDeviceToAlignedMap, Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap, - Map<String, InputLocation> sourceColumnToInputLocationMap) { - super(operatorContext, child, null, sourceColumnToInputLocationMap); + Map<String, InputLocation> sourceColumnToInputLocationMap, + ExecutorService intoOperationExecutor) { + super(operatorContext, child, null, sourceColumnToInputLocationMap, intoOperationExecutor); this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap; this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap; this.targetDeviceToAlignedMap = targetDeviceToAlignedMap; @@ -76,7 +78,7 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator { @Override public TsBlock next() { - if (!writeOperationDone()) { + if (!handleWriteOperationFuture()) { return null; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java index 67a4fb0b0e..9eb6c14886 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java @@ -35,6 +35,7 @@ import org.apache.iotdb.tsfile.utils.Pair; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; public class IntoOperator extends AbstractIntoOperator { @@ -48,19 +49,21 @@ public class IntoOperator extends AbstractIntoOperator { Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap, Map<String, Boolean> targetDeviceToAlignedMap, List<Pair<String, PartialPath>> sourceTargetPathPairList, - Map<String, InputLocation> sourceColumnToInputLocationMap) { + Map<String, InputLocation> sourceColumnToInputLocationMap, + ExecutorService intoOperationExecutor) { super( operatorContext, child, constructInsertTabletStatementGenerators( targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap), - sourceColumnToInputLocationMap); + sourceColumnToInputLocationMap, + intoOperationExecutor); this.sourceTargetPathPairList = sourceTargetPathPairList; } @Override public TsBlock next() { - if (!writeOperationDone()) { + if (!handleWriteOperationFuture()) { return null; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java index 4a3d844b73..e2497d44c9 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java @@ -35,6 +35,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; @@ -169,4 +170,8 @@ public class LocalExecutionPlanContext { public long getDataRegionTTL() { return dataRegionTTL; } + + public ExecutorService getIntoOperationExecutor() { + return instanceContext.getIntoOperationExecutor(); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index 484a342715..a6c2ebd261 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -1383,7 +1383,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP targetPathToDataTypeMap, intoPathDescriptor.getTargetDeviceToAlignedMap(), intoPathDescriptor.getSourceTargetPathPairList(), - sourceColumnToInputLocationMap); + sourceColumnToInputLocationMap, + context.getIntoOperationExecutor()); } @Override @@ -1433,7 +1434,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP deviceToTargetPathDataTypeMap, deviceViewIntoPathDescriptor.getTargetDeviceToAlignedMap(), deviceViewIntoPathDescriptor.getDeviceToSourceTargetPathPairListMap(), - sourceColumnToInputLocationMap); + sourceColumnToInputLocationMap, + context.getIntoOperationExecutor()); } private Map<String, InputLocation> constructSourceColumnToInputLocationMap(PlanNode node) {
