This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch MemoryFreeBug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 04f518e71162acb85cf0e7d417f1e392214e2751
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Apr 4 16:14:18 2023 +0800

    Fix a memory free bug and delete useless O(N^2) check in DriverContext
---
 .../iotdb/db/mpp/common/FragmentInstanceId.java    |  2 +-
 .../db/mpp/execution/driver/DriverContext.java     | 10 ----
 .../execution/exchange/MPPDataExchangeManager.java | 57 +++++++++++++---------
 3 files changed, 36 insertions(+), 33 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index 24164c4e89..1d5ca26b22 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -58,7 +58,7 @@ public class FragmentInstanceId {
   }
 
   public String getFragmentInstanceId() {
-    return fragmentId + "." + instanceId;
+    return fragmentId.getId() + "." + instanceId;
   }
 
   public String toString() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
index 58a131545f..9b0ae75f28 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
@@ -29,8 +29,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 public class DriverContext {
 
   private boolean inputDriver = true;
@@ -51,14 +49,6 @@ public class DriverContext {
 
   public OperatorContext addOperatorContext(
       int operatorId, PlanNodeId planNodeId, String operatorType) {
-    checkArgument(operatorId >= 0, "operatorId is negative");
-
-    for (OperatorContext operatorContext : operatorContexts) {
-      checkArgument(
-          operatorId != operatorContext.getOperatorId(),
-          "A context already exists for operatorId %s",
-          operatorId);
-    }
 
     OperatorContext operatorContext =
         new OperatorContext(operatorId, planNodeId, operatorType, this);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 9a58904c79..73729fe0ec 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -485,11 +485,13 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
       // FragmentInstanceContext
       FragmentInstanceContext instanceContext) {
 
-    LOGGER.debug(
-        "Create local sink handle to plan node {} of {} for {}",
-        remotePlanNodeId,
-        remoteFragmentInstanceId,
-        localFragmentInstanceId);
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Create local sink handle to plan node {} of {} for {}",
+          remotePlanNodeId,
+          remoteFragmentInstanceId,
+          localFragmentInstanceId);
+    }
 
     SharedTsBlockQueue queue;
     Map<String, ISourceHandle> sourceHandleMap = 
sourceHandles.get(remoteFragmentInstanceId);
@@ -515,7 +517,9 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
    */
   public ISinkChannel createLocalSinkChannelForPipeline(
       DriverContext driverContext, String planNodeId) {
-    LOGGER.debug("Create local sink handle for {}", 
driverContext.getDriverTaskID());
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Create local sink handle for {}", 
driverContext.getDriverTaskID());
+    }
     SharedTsBlockQueue queue =
         new SharedTsBlockQueue(
             driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(),
@@ -537,11 +541,13 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
       // FragmentInstanceContext
       FragmentInstanceContext instanceContext) {
 
-    LOGGER.debug(
-        "Create sink handle to plan node {} of {} for {}",
-        remotePlanNodeId,
-        remoteFragmentInstanceId,
-        localFragmentInstanceId);
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Create sink handle to plan node {} of {} for {}",
+          remotePlanNodeId,
+          remoteFragmentInstanceId,
+          localFragmentInstanceId);
+    }
 
     return new SinkChannel(
         remoteEndpoint,
@@ -623,7 +629,9 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
    */
   public ISourceHandle createLocalSourceHandleForPipeline(
       SharedTsBlockQueue queue, DriverContext context) {
-    LOGGER.debug("Create local source handle for {}", 
context.getDriverTaskID());
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Create local source handle for {}", 
context.getDriverTaskID());
+    }
     return new LocalSourceHandle(
         queue,
         new PipelineSourceHandleListenerImpl(context::failed),
@@ -647,11 +655,14 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
               + " exists.");
     }
 
-    LOGGER.debug(
-        "Create local source handle from {} for plan node {} of {}",
-        remoteFragmentInstanceId,
-        localPlanNodeId,
-        localFragmentInstanceId);
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Create local source handle from {} for plan node {} of {}",
+          remoteFragmentInstanceId,
+          localPlanNodeId,
+          localFragmentInstanceId);
+    }
+
     SharedTsBlockQueue queue;
     ISinkHandle sinkHandle = shuffleSinkHandles.get(remoteFragmentInstanceId);
     if (sinkHandle != null) {
@@ -692,11 +703,13 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
               + " exists.");
     }
 
-    LOGGER.debug(
-        "Create source handle from {} for plan node {} of {}",
-        remoteFragmentInstanceId,
-        localPlanNodeId,
-        localFragmentInstanceId);
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Create source handle from {} for plan node {} of {}",
+          remoteFragmentInstanceId,
+          localPlanNodeId,
+          localFragmentInstanceId);
+    }
 
     SourceHandle sourceHandle =
         new SourceHandle(

Reply via email to