This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/FixIntoOperator1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d7f5a13dc894ae224f7dd4e623e8e9bef0649da8
Author: Minghui Liu <[email protected]>
AuthorDate: Wed Nov 30 20:06:52 2022 +0800

    add some comments
---
 .../fragment/FragmentInstanceManager.java          | 11 ++++++-----
 .../operator/process/AbstractIntoOperator.java     |  8 +++-----
 .../operator/process/DeviceViewIntoOperator.java   | 23 +++++++++++-----------
 .../execution/operator/process/IntoOperator.java   |  3 ++-
 4 files changed, 23 insertions(+), 22 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 1bc2444bb1..f3da54b6c4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -68,7 +68,7 @@ public class FragmentInstanceManager {
   private static final long QUERY_TIMEOUT_MS =
       IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
 
-  private ExecutorService intoOperationExecutor;
+  private final ExecutorService intoOperationExecutor;
 
   public static FragmentInstanceManager getInstance() {
     return FragmentInstanceManager.InstanceHolder.INSTANCE;
@@ -81,10 +81,6 @@ public class FragmentInstanceManager {
         IoTDBThreadPoolFactory.newScheduledThreadPool(1, 
"instance-management");
     this.instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification");
-    this.intoOperationExecutor =
-        IoTDBThreadPoolFactory.newFixedThreadPool(
-            
IoTDBDescriptor.getInstance().getConfig().getIntoOperationExecutionThreadCount(),
-            "into-operation-executor");
 
     this.infoCacheTime = new Duration(5, TimeUnit.MINUTES);
 
@@ -96,6 +92,11 @@ public class FragmentInstanceManager {
         200,
         200,
         TimeUnit.MILLISECONDS);
+
+    this.intoOperationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            
IoTDBDescriptor.getInstance().getConfig().getIntoOperationExecutionThreadCount(),
+            "into-operation-executor");
   }
 
   public FragmentInstanceInfo execDataQueryFragmentInstance(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 7d41985ece..dd117e6d97 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -39,8 +39,6 @@ import org.apache.iotdb.tsfile.utils.BitMap;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -56,8 +54,6 @@ import static 
com.google.common.util.concurrent.Futures.successfulAsList;
 
 public abstract class AbstractIntoOperator implements ProcessOperator {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractIntoOperator.class);
-
   protected final OperatorContext operatorContext;
   protected final Operator child;
 
@@ -103,6 +99,7 @@ public abstract class AbstractIntoOperator implements 
ProcessOperator {
     return insertTabletStatementGenerators;
   }
 
+  /** Return true if write task is submitted successfully. */
   protected boolean insertMultiTabletsInternally(boolean needCheck) {
     InsertMultiTabletsStatement insertMultiTabletsStatement =
         constructInsertMultiTabletsStatement(needCheck);
@@ -146,7 +143,8 @@ public abstract class AbstractIntoOperator implements 
ProcessOperator {
             () -> client.insertTablets(insertMultiTabletsStatement), 
writeOperationExecutor);
   }
 
-  protected boolean handleWriteOperationFuture() {
+  /** Return true if the previous write task has done. */
+  protected boolean processWriteOperationFuture() {
     if (writeOperationFuture == null) {
       return true;
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index 82214a562b..07cd3c0a0a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -78,7 +78,7 @@ public class DeviceViewIntoOperator extends 
AbstractIntoOperator {
 
   @Override
   public TsBlock next() {
-    if (!handleWriteOperationFuture()) {
+    if (!processWriteOperationFuture()) {
       return null;
     }
 
@@ -107,6 +107,17 @@ public class DeviceViewIntoOperator extends 
AbstractIntoOperator {
     }
   }
 
+  private List<IntoOperator.InsertTabletStatementGenerator>
+      constructInsertTabletStatementGeneratorsByDevice(String currentDevice) {
+    Map<PartialPath, Map<String, InputLocation>> 
targetPathToSourceInputLocationMap =
+        deviceToTargetPathSourceInputLocationMap.get(currentDevice);
+    Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap =
+        deviceToTargetPathDataTypeMap.get(currentDevice);
+    return constructInsertTabletStatementGenerators(
+        targetPathToSourceInputLocationMap, targetPathToDataTypeMap, 
targetDeviceToAlignedMap);
+  }
+
+  /** Return true if write task is submitted during processing */
   private boolean processTsBlock(TsBlock inputTsBlock) {
     if (inputTsBlock == null || inputTsBlock.isEmpty()) {
       return true;
@@ -163,14 +174,4 @@ public class DeviceViewIntoOperator extends 
AbstractIntoOperator {
       resultTsBlockBuilder.declarePosition();
     }
   }
-
-  private List<IntoOperator.InsertTabletStatementGenerator>
-      constructInsertTabletStatementGeneratorsByDevice(String currentDevice) {
-    Map<PartialPath, Map<String, InputLocation>> 
targetPathToSourceInputLocationMap =
-        deviceToTargetPathSourceInputLocationMap.get(currentDevice);
-    Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap =
-        deviceToTargetPathDataTypeMap.get(currentDevice);
-    return constructInsertTabletStatementGenerators(
-        targetPathToSourceInputLocationMap, targetPathToDataTypeMap, 
targetDeviceToAlignedMap);
-  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 9eb6c14886..5aba97e22a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -63,7 +63,7 @@ public class IntoOperator extends AbstractIntoOperator {
 
   @Override
   public TsBlock next() {
-    if (!handleWriteOperationFuture()) {
+    if (!processWriteOperationFuture()) {
       return null;
     }
 
@@ -86,6 +86,7 @@ public class IntoOperator extends AbstractIntoOperator {
     }
   }
 
+  /** Return true if write task is submitted during processing */
   private boolean processTsBlock(TsBlock inputTsBlock) {
     if (inputTsBlock == null || inputTsBlock.isEmpty()) {
       return true;

Reply via email to