This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/FixIntoOperator1.0 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d7f5a13dc894ae224f7dd4e623e8e9bef0649da8 Author: Minghui Liu <[email protected]> AuthorDate: Wed Nov 30 20:06:52 2022 +0800 add some comments --- .../fragment/FragmentInstanceManager.java | 11 ++++++----- .../operator/process/AbstractIntoOperator.java | 8 +++----- .../operator/process/DeviceViewIntoOperator.java | 23 +++++++++++----------- .../execution/operator/process/IntoOperator.java | 3 ++- 4 files changed, 23 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java index 1bc2444bb1..f3da54b6c4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java @@ -68,7 +68,7 @@ public class FragmentInstanceManager { private static final long QUERY_TIMEOUT_MS = IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(); - private ExecutorService intoOperationExecutor; + private final ExecutorService intoOperationExecutor; public static FragmentInstanceManager getInstance() { return FragmentInstanceManager.InstanceHolder.INSTANCE; @@ -81,10 +81,6 @@ public class FragmentInstanceManager { IoTDBThreadPoolFactory.newScheduledThreadPool(1, "instance-management"); this.instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification"); - this.intoOperationExecutor = - IoTDBThreadPoolFactory.newFixedThreadPool( - IoTDBDescriptor.getInstance().getConfig().getIntoOperationExecutionThreadCount(), - "into-operation-executor"); this.infoCacheTime = new Duration(5, TimeUnit.MINUTES); @@ -96,6 +92,11 @@ public class FragmentInstanceManager { 200, 200, TimeUnit.MILLISECONDS); + + this.intoOperationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool( + IoTDBDescriptor.getInstance().getConfig().getIntoOperationExecutionThreadCount(), + "into-operation-executor"); } public FragmentInstanceInfo execDataQueryFragmentInstance( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java index 7d41985ece..dd117e6d97 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java @@ -39,8 +39,6 @@ import org.apache.iotdb.tsfile.utils.BitMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; @@ -56,8 +54,6 @@ import static com.google.common.util.concurrent.Futures.successfulAsList; public abstract class AbstractIntoOperator implements ProcessOperator { - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIntoOperator.class); - protected final OperatorContext operatorContext; protected final Operator child; @@ -103,6 +99,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator { return insertTabletStatementGenerators; } + /** Return true if write task is submitted successfully. */ protected boolean insertMultiTabletsInternally(boolean needCheck) { InsertMultiTabletsStatement insertMultiTabletsStatement = constructInsertMultiTabletsStatement(needCheck); @@ -146,7 +143,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator { () -> client.insertTablets(insertMultiTabletsStatement), writeOperationExecutor); } - protected boolean handleWriteOperationFuture() { + /** Return true if the previous write task has done. */ + protected boolean processWriteOperationFuture() { if (writeOperationFuture == null) { return true; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java index 82214a562b..07cd3c0a0a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java @@ -78,7 +78,7 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator { @Override public TsBlock next() { - if (!handleWriteOperationFuture()) { + if (!processWriteOperationFuture()) { return null; } @@ -107,6 +107,17 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator { } } + private List<IntoOperator.InsertTabletStatementGenerator> + constructInsertTabletStatementGeneratorsByDevice(String currentDevice) { + Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap = + deviceToTargetPathSourceInputLocationMap.get(currentDevice); + Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap = + deviceToTargetPathDataTypeMap.get(currentDevice); + return constructInsertTabletStatementGenerators( + targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap); + } + + /** Return true if write task is submitted during processing */ private boolean processTsBlock(TsBlock inputTsBlock) { if (inputTsBlock == null || inputTsBlock.isEmpty()) { return true; @@ -163,14 +174,4 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator { resultTsBlockBuilder.declarePosition(); } } - - private List<IntoOperator.InsertTabletStatementGenerator> - constructInsertTabletStatementGeneratorsByDevice(String currentDevice) { - Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap = - deviceToTargetPathSourceInputLocationMap.get(currentDevice); - Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap = - deviceToTargetPathDataTypeMap.get(currentDevice); - return constructInsertTabletStatementGenerators( - targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap); - } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java index 9eb6c14886..5aba97e22a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java @@ -63,7 +63,7 @@ public class IntoOperator extends AbstractIntoOperator { @Override public TsBlock next() { - if (!handleWriteOperationFuture()) { + if (!processWriteOperationFuture()) { return null; } @@ -86,6 +86,7 @@ public class IntoOperator extends AbstractIntoOperator { } } + /** Return true if write task is submitted during processing */ private boolean processTsBlock(TsBlock inputTsBlock) { if (inputTsBlock == null || inputTsBlock.isEmpty()) { return true;
