This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 89d96ade469 Fix potential deadlock when freeing memory in MemoryPool
89d96ade469 is described below

commit 89d96ade469da99b6ca15c740aae1a91087a2b54
Author: Liao Lanyu <[email protected]>
AuthorDate: Fri May 26 12:58:16 2023 +0800

    Fix potential deadlock when freeing memory in MemoryPool
---
 .../src/assembly/resources/conf/iotdb-common.properties  |  5 +++--
 .../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java  |  5 +++--
 .../mpp/execution/exchange/MPPDataExchangeManager.java   | 10 +++++++---
 .../db/mpp/execution/exchange/SharedTsBlockQueue.java    | 16 +++++++++++++---
 .../db/mpp/execution/exchange/LocalSinkChannelTest.java  | 14 ++++++++++++--
 .../db/mpp/execution/exchange/LocalSourceHandleTest.java | 14 ++++++++++++--
 .../mpp/execution/exchange/SharedTsBlockQueueTest.java   |  7 ++++++-
 7 files changed, 56 insertions(+), 15 deletions(-)

diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 6e1087984ee..4a22248cbd1 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -431,9 +431,10 @@ cluster_name=defaultCluster
 # Datatype: int
 # query_thread_count=0
 
-# How many pipeline drivers will be created for one fragment instance. When <= 
0, use CPU core number / 2.
+# How many pipeline drivers will be created for one fragment instance. Default 
dop = 1 means FI will not be further split.
+# CPU core number / 2 could be a choice.
 # Datatype: int
-# degree_of_query_parallelism=0
+# degree_of_query_parallelism=1
 
 # The threshold of count map size when calculating the MODE aggregation 
function
 # Datatype: int
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 838e8e3e232..6654a38e6db 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -335,7 +335,8 @@ public class IoTDBConfig {
   /** How many threads can concurrently execute query statement. When <= 0, 
use CPU core number. */
   private int queryThreadCount = Runtime.getRuntime().availableProcessors();
 
-  private int degreeOfParallelism = Math.max(1, 
Runtime.getRuntime().availableProcessors() / 2);
+  /** default dop = 1 for now */
+  private int degreeOfParallelism = 1;
 
   private int modeMapSizeThreshold = 10000;
 
@@ -1604,7 +1605,7 @@ public class IoTDBConfig {
   }
 
   public void setDegreeOfParallelism(int degreeOfParallelism) {
-    this.degreeOfParallelism = degreeOfParallelism;
+    this.degreeOfParallelism = Math.max(1, degreeOfParallelism);
   }
 
   public int getDegreeOfParallelism() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index c1273ba2a48..119b8178ae9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -591,7 +591,9 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
       queue = localSourceHandle.getSharedTsBlockQueue();
     } else {
       LOGGER.debug("Create SharedTsBlockQueue");
-      queue = new SharedTsBlockQueue(localFragmentInstanceId, localPlanNodeId, 
localMemoryManager);
+      queue =
+          new SharedTsBlockQueue(
+              localFragmentInstanceId, localPlanNodeId, localMemoryManager, 
executorService);
     }
 
     return new LocalSinkChannel(
@@ -614,7 +616,8 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
         new SharedTsBlockQueue(
             driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(),
             planNodeId,
-            localMemoryManager);
+            localMemoryManager,
+            executorService);
     queue.allowAddingTsBlock();
     return new LocalSinkChannel(
         queue,
@@ -770,7 +773,8 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
     } else {
       LOGGER.debug("Create SharedTsBlockQueue");
       queue =
-          new SharedTsBlockQueue(remoteFragmentInstanceId, remotePlanNodeId, 
localMemoryManager);
+          new SharedTsBlockQueue(
+              remoteFragmentInstanceId, remotePlanNodeId, localMemoryManager, 
executorService);
     }
     LocalSourceHandle localSourceHandle =
         new LocalSourceHandle(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 6e484262d50..905199f76e6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -38,9 +38,9 @@ import javax.annotation.concurrent.NotThreadSafe;
 
 import java.util.LinkedList;
 import java.util.Queue;
+import java.util.concurrent.ExecutorService;
 
 import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
-import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 
 /** This is not thread safe class, the caller should ensure multi-threads 
safety. */
 @NotThreadSafe
@@ -81,10 +81,14 @@ public class SharedTsBlockQueue {
   private long maxBytesCanReserve =
       
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
 
+  // used for SharedTsBlockQueue listener
+  private final ExecutorService executorService;
+
   public SharedTsBlockQueue(
       TFragmentInstanceId fragmentInstanceId,
       String planNodeId,
-      LocalMemoryManager localMemoryManager) {
+      LocalMemoryManager localMemoryManager,
+      ExecutorService executorService) {
     this.localFragmentInstanceId =
         Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be 
null");
     this.fullFragmentInstanceId =
@@ -92,6 +96,7 @@ public class SharedTsBlockQueue {
     this.localPlanNodeId = Validate.notNull(planNodeId, "PlanNode ID cannot be 
null");
     this.localMemoryManager =
         Validate.notNull(localMemoryManager, "local memory manager cannot be 
null");
+    this.executorService = Validate.notNull(executorService, "ExecutorService 
can not be null.");
   }
 
   public boolean hasNoMoreTsBlocks() {
@@ -235,7 +240,12 @@ public class SharedTsBlockQueue {
               }
             }
           },
-          directExecutor());
+          // Use directExecutor() here could lead to deadlock. Thread A holds 
lock of
+          // SharedTsBlockQueueA and tries to invoke the listener of
+          // SharedTsBlockQueueB(when freeing memory to complete 
MemoryReservationFuture) while
+          // Thread B holds lock of SharedTsBlockQueueB and tries to invoke 
the listener of
+          // SharedTsBlockQueueA
+          executorService);
     } else { // reserve memory succeeded, add the TsBlock directly
       queue.add(tsBlock);
       if (!blocked.isDone()) {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java
index dac0ec75d85..694edfc6b6e 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.java
@@ -32,6 +32,8 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static 
org.testcontainers.shaded.com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+
 public class LocalSinkChannelTest {
   @Test
   public void testSend() {
@@ -50,7 +52,11 @@ public class LocalSinkChannelTest {
     SinkListener mockSinkListener = Mockito.mock(SinkListener.class);
     // Construct a shared TsBlock queue.
     SharedTsBlockQueue queue =
-        new SharedTsBlockQueue(remoteFragmentInstanceId, remotePlanNodeId, 
mockLocalMemoryManager);
+        new SharedTsBlockQueue(
+            remoteFragmentInstanceId,
+            remotePlanNodeId,
+            mockLocalMemoryManager,
+            newDirectExecutorService());
 
     // Construct Sink.
     LocalSinkChannel localSinkChannel =
@@ -137,7 +143,11 @@ public class LocalSinkChannelTest {
     SinkListener mockSinkListener = Mockito.mock(SinkListener.class);
     // Construct a shared tsblock queue.
     SharedTsBlockQueue queue =
-        new SharedTsBlockQueue(remoteFragmentInstanceId, remotePlanNodeId, 
mockLocalMemoryManager);
+        new SharedTsBlockQueue(
+            remoteFragmentInstanceId,
+            remotePlanNodeId,
+            mockLocalMemoryManager,
+            newDirectExecutorService());
 
     // Construct SinkChannel.
     LocalSinkChannel localSinkChannel =
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
index aa15f199676..db57cd0c329 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandleTest.java
@@ -30,6 +30,8 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static 
org.testcontainers.shaded.com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+
 public class LocalSourceHandleTest {
   @Test
   public void testReceive() {
@@ -47,7 +49,11 @@ public class LocalSourceHandleTest {
     SourceHandleListener mockSourceHandleListener = 
Mockito.mock(SourceHandleListener.class);
     // Construct a shared TsBlock queue.
     SharedTsBlockQueue queue =
-        new SharedTsBlockQueue(localFragmentInstanceId, localPlanNodeId, 
mockLocalMemoryManager);
+        new SharedTsBlockQueue(
+            localFragmentInstanceId,
+            localPlanNodeId,
+            mockLocalMemoryManager,
+            newDirectExecutorService());
 
     LocalSourceHandle localSourceHandle =
         new LocalSourceHandle(
@@ -91,7 +97,11 @@ public class LocalSourceHandleTest {
     SourceHandleListener mockSourceHandleListener = 
Mockito.mock(SourceHandleListener.class);
     // Construct a shared tsblock queue.
     SharedTsBlockQueue queue =
-        new SharedTsBlockQueue(localFragmentInstanceId, localPlanNodeId, 
mockLocalMemoryManager);
+        new SharedTsBlockQueue(
+            localFragmentInstanceId,
+            localPlanNodeId,
+            mockLocalMemoryManager,
+            newDirectExecutorService());
 
     LocalSourceHandle localSourceHandle =
         new LocalSourceHandle(
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueueTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueueTest.java
index e5336d2ac2a..9c3399a6e17 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueueTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueueTest.java
@@ -33,6 +33,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.testcontainers.shaded.com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+
 public class SharedTsBlockQueueTest {
   @Test(timeout = 5000L)
   public void concurrencyTest() {
@@ -46,7 +48,10 @@ public class SharedTsBlockQueueTest {
     
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
     SharedTsBlockQueue queue =
         new SharedTsBlockQueue(
-            new TFragmentInstanceId(queryId, 0, "0"), "test", 
mockLocalMemoryManager);
+            new TFragmentInstanceId(queryId, 0, "0"),
+            "test",
+            mockLocalMemoryManager,
+            newDirectExecutorService());
     queue.getCanAddTsBlock().set(null);
     queue.setMaxBytesCanReserve(Long.MAX_VALUE);
 

Reply via email to