This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ad3384f0f Change config in SourceHandleTest according to 
IoTDBDescriptor
5ad3384f0f is described below

commit 5ad3384f0f37fc1982a823c714377dcf090da931
Author: Liao Lanyu <[email protected]>
AuthorDate: Tue Jan 10 16:44:00 2023 +0800

    Change config in SourceHandleTest according to IoTDBDescriptor
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ++-
 .../mpp/execution/exchange/SourceHandleTest.java   | 82 ++++++++++++++--------
 2 files changed, 61 insertions(+), 32 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 0268ff12a5..acafb58bb7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.conf;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import 
org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.audit.AuditLogOperation;
 import org.apache.iotdb.db.audit.AuditLogStorage;
@@ -512,6 +513,9 @@ public class IoTDBConfig {
   /** Memory allocated for operators */
   private long allocateMemoryForDataExchange = allocateMemoryForRead * 200 / 
1001;
 
+  /** Max bytes of each FragmentInstance for DataExchange */
+  private long maxBytesPerFragmentInstance = allocateMemoryForDataExchange / 
queryThreadCount;
+
   /** Memory allocated proportion for timeIndex */
   private long allocateMemoryForTimeIndex = allocateMemoryForRead * 200 / 1001;
 
@@ -1529,7 +1533,12 @@ public class IoTDBConfig {
   }
 
   public long getMaxBytesPerFragmentInstance() {
-    return allocateMemoryForDataExchange / queryThreadCount;
+    return maxBytesPerFragmentInstance;
+  }
+
+  @TestOnly
+  public void setMaxBytesPerFragmentInstance(long maxBytesPerFragmentInstance) 
{
+    this.maxBytesPerFragmentInstance = maxBytesPerFragmentInstance;
   }
 
   public int getRawQueryBlockingQueueCapacity() {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
index aa9adbace6..1a5ad7560f 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
@@ -34,7 +34,9 @@ import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockResponse;
 import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
 
 import org.apache.thrift.TException;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -47,10 +49,24 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 public class SourceHandleTest {
+  private static final long MOCK_TSBLOCK_SIZE = 1024L * 1024L;
+
+  private static long maxBytesPerFI;
+
+  @BeforeClass
+  public static void beforeClass() {
+    maxBytesPerFI = 
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
+    IoTDBDescriptor.getInstance().getConfig().setMaxBytesPerFragmentInstance(5 
* MOCK_TSBLOCK_SIZE);
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    
IoTDBDescriptor.getInstance().getConfig().setMaxBytesPerFragmentInstance(maxBytesPerFI);
+  }
+
   @Test
   public void testNonBlockedOneTimeReceive() {
     final String queryId = "q0";
-    final long mockTsBlockSize = 1024L * 1024L;
     final int numOfMockTsBlock = 10;
     final TEndPoint remoteEndpoint =
         new TEndPoint("remote", 
IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
@@ -88,7 +104,7 @@ public class SourceHandleTest {
     // Construct a mock SourceHandleListener.
     SourceHandleListener mockSourceHandleListener = 
Mockito.mock(SourceHandleListener.class);
     // Construct a mock TsBlockSerde that deserializes any bytebuffer into a 
mock TsBlock.
-    TsBlockSerde mockTsBlockSerde = 
Utils.createMockTsBlockSerde(mockTsBlockSize);
+    TsBlockSerde mockTsBlockSerde = 
Utils.createMockTsBlockSerde(MOCK_TSBLOCK_SIZE);
 
     SourceHandle sourceHandle =
         new SourceHandle(
@@ -109,7 +125,7 @@ public class SourceHandleTest {
     // New data blocks event arrived.
     sourceHandle.updatePendingDataBlockInfo(
         0,
-        Stream.generate(() -> mockTsBlockSize)
+        Stream.generate(() -> MOCK_TSBLOCK_SIZE)
             .limit(numOfMockTsBlock)
             .collect(Collectors.toList()));
     try {
@@ -135,7 +151,7 @@ public class SourceHandleTest {
     Assert.assertFalse(sourceHandle.isAborted());
     Assert.assertFalse(sourceHandle.isFinished());
     Assert.assertEquals(
-        numOfMockTsBlock * mockTsBlockSize, 
sourceHandle.getBufferRetainedSizeInBytes());
+        numOfMockTsBlock * MOCK_TSBLOCK_SIZE, 
sourceHandle.getBufferRetainedSizeInBytes());
 
     // The local fragment instance consumes the data blocks.
     for (int i = 0; i < numOfMockTsBlock; i++) {
@@ -148,7 +164,7 @@ public class SourceHandleTest {
       Assert.assertFalse(sourceHandle.isAborted());
       Assert.assertFalse(sourceHandle.isFinished());
       Assert.assertEquals(
-          (numOfMockTsBlock - 1 - i) * mockTsBlockSize,
+          (numOfMockTsBlock - 1 - i) * MOCK_TSBLOCK_SIZE,
           sourceHandle.getBufferRetainedSizeInBytes());
     }
 
@@ -165,7 +181,6 @@ public class SourceHandleTest {
   @Test
   public void testBlockedOneTimeReceive() {
     final String queryId = "q0";
-    final long mockTsBlockSize = 1024L * 1024L;
     final int numOfMockTsBlock = 10;
     final TEndPoint remoteEndpoint =
         new TEndPoint("remote", 
IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
@@ -173,10 +188,10 @@ public class SourceHandleTest {
     final String localPlanNodeId = "exchange_0";
     final TFragmentInstanceId localFragmentInstanceId = new 
TFragmentInstanceId(queryId, 0, "0");
 
-    // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize 
per query.
+    // Construct a mock LocalMemoryManager with capacity 5 * MOCK_TSBLOCK_SIZE 
per query.
     LocalMemoryManager mockLocalMemoryManager = 
Mockito.mock(LocalMemoryManager.class);
     MemoryPool spyMemoryPool =
-        Mockito.spy(new MemoryPool("test", 10 * mockTsBlockSize, 5 * 
mockTsBlockSize));
+        Mockito.spy(new MemoryPool("test", 10 * MOCK_TSBLOCK_SIZE, 5 * 
MOCK_TSBLOCK_SIZE));
     
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
     IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> 
mockClientManager =
         Mockito.mock(IClientManager.class);
@@ -204,7 +219,7 @@ public class SourceHandleTest {
     // Construct a mock SourceHandleListener.
     SourceHandleListener mockSourceHandleListener = 
Mockito.mock(SourceHandleListener.class);
     // Construct a mock TsBlockSerde that deserializes any bytebuffer into a 
mock TsBlock.
-    TsBlockSerde mockTsBlockSerde = 
Utils.createMockTsBlockSerde(mockTsBlockSize);
+    TsBlockSerde mockTsBlockSerde = 
Utils.createMockTsBlockSerde(MOCK_TSBLOCK_SIZE);
 
     SourceHandle sourceHandle =
         new SourceHandle(
@@ -217,7 +232,11 @@ public class SourceHandleTest {
             mockTsBlockSerde,
             mockSourceHandleListener,
             mockClientManager);
-    sourceHandle.setMaxBytesCanReserve(5 * mockTsBlockSize);
+    long maxBytesCanReserve =
+        Math.min(
+            5 * MOCK_TSBLOCK_SIZE,
+            
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance());
+    sourceHandle.setMaxBytesCanReserve(maxBytesCanReserve);
     Assert.assertFalse(sourceHandle.isBlocked().isDone());
     Assert.assertFalse(sourceHandle.isAborted());
     Assert.assertFalse(sourceHandle.isFinished());
@@ -226,7 +245,7 @@ public class SourceHandleTest {
     // New data blocks event arrived.
     sourceHandle.updatePendingDataBlockInfo(
         0,
-        Stream.generate(() -> mockTsBlockSize)
+        Stream.generate(() -> MOCK_TSBLOCK_SIZE)
             .limit(numOfMockTsBlock)
             .collect(Collectors.toList()));
     try {
@@ -235,8 +254,8 @@ public class SourceHandleTest {
               queryId,
               localFragmentInstanceId.getInstanceId(),
               localPlanNodeId,
-              mockTsBlockSize,
-              5 * mockTsBlockSize);
+              MOCK_TSBLOCK_SIZE,
+              maxBytesCanReserve);
       Mockito.verify(mockClient, Mockito.timeout(10_0000).times(1))
           .getDataBlock(
               Mockito.argThat(
@@ -258,17 +277,18 @@ public class SourceHandleTest {
     Assert.assertTrue(sourceHandle.isBlocked().isDone());
     Assert.assertFalse(sourceHandle.isAborted());
     Assert.assertFalse(sourceHandle.isFinished());
-    Assert.assertEquals(6 * mockTsBlockSize, 
sourceHandle.getBufferRetainedSizeInBytes());
+    Assert.assertEquals(6 * MOCK_TSBLOCK_SIZE, 
sourceHandle.getBufferRetainedSizeInBytes());
 
     // The local fragment instance consumes the data blocks.
     for (int i = 0; i < numOfMockTsBlock; i++) {
       Mockito.verify(spyMemoryPool, Mockito.timeout(10_0000).times(i))
-          .free(queryId, localFragmentInstanceId.getInstanceId(), 
localPlanNodeId, mockTsBlockSize);
+          .free(
+              queryId, localFragmentInstanceId.getInstanceId(), 
localPlanNodeId, MOCK_TSBLOCK_SIZE);
       sourceHandle.receive();
       try {
         if (i < 5) {
           Assert.assertEquals(
-              i == 4 ? 5 * mockTsBlockSize : 6 * mockTsBlockSize,
+              i == 4 ? 5 * MOCK_TSBLOCK_SIZE : 6 * MOCK_TSBLOCK_SIZE,
               sourceHandle.getBufferRetainedSizeInBytes());
           final int startSequenceId = 5 + i;
           Mockito.verify(mockClient, Mockito.timeout(10_000).times(1))
@@ -287,7 +307,7 @@ public class SourceHandleTest {
                               && startSequenceId + 1 == e.getEndSequenceId()));
         } else {
           Assert.assertEquals(
-              (numOfMockTsBlock - 1 - i) * mockTsBlockSize,
+              (numOfMockTsBlock - 1 - i) * MOCK_TSBLOCK_SIZE,
               sourceHandle.getBufferRetainedSizeInBytes());
         }
       } catch (TException e) {
@@ -316,7 +336,7 @@ public class SourceHandleTest {
   @Test
   public void testMultiTimesReceive() {
     final String queryId = "q0";
-    final long mockTsBlockSize = 1024L * 1024L;
+    final long MOCK_TSBLOCK_SIZE = 1024L * 1024L;
     final int numOfMockTsBlock = 10;
     final TEndPoint remoteEndpoint =
         new TEndPoint("remote", 
IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
@@ -331,7 +351,7 @@ public class SourceHandleTest {
     // Construct a mock SourceHandleListener.
     SourceHandleListener mockSourceHandleListener = 
Mockito.mock(SourceHandleListener.class);
     // Construct a mock TsBlockSerde that deserializes any bytebuffer into a 
mock TsBlock.
-    TsBlockSerde mockTsBlockSerde = 
Utils.createMockTsBlockSerde(mockTsBlockSize);
+    TsBlockSerde mockTsBlockSerde = 
Utils.createMockTsBlockSerde(MOCK_TSBLOCK_SIZE);
     IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> 
mockClientManager =
         Mockito.mock(IClientManager.class);
     // Construct a mock client.
@@ -375,7 +395,7 @@ public class SourceHandleTest {
     // New data blocks event arrived in unordered manner.
     sourceHandle.updatePendingDataBlockInfo(
         numOfMockTsBlock,
-        Stream.generate(() -> mockTsBlockSize)
+        Stream.generate(() -> MOCK_TSBLOCK_SIZE)
             .limit(numOfMockTsBlock)
             .collect(Collectors.toList()));
     try {
@@ -394,7 +414,7 @@ public class SourceHandleTest {
 
     sourceHandle.updatePendingDataBlockInfo(
         0,
-        Stream.generate(() -> mockTsBlockSize)
+        Stream.generate(() -> MOCK_TSBLOCK_SIZE)
             .limit(numOfMockTsBlock)
             .collect(Collectors.toList()));
     try {
@@ -420,7 +440,7 @@ public class SourceHandleTest {
     Assert.assertFalse(sourceHandle.isAborted());
     Assert.assertFalse(sourceHandle.isFinished());
     Assert.assertEquals(
-        numOfMockTsBlock * 2 * mockTsBlockSize, 
sourceHandle.getBufferRetainedSizeInBytes());
+        numOfMockTsBlock * 2 * MOCK_TSBLOCK_SIZE, 
sourceHandle.getBufferRetainedSizeInBytes());
 
     // The local fragment instance consumes the data blocks.
     for (int i = 0; i < 2 * numOfMockTsBlock; i++) {
@@ -433,14 +453,14 @@ public class SourceHandleTest {
       Assert.assertFalse(sourceHandle.isAborted());
       Assert.assertFalse(sourceHandle.isFinished());
       Assert.assertEquals(
-          (2 * numOfMockTsBlock - 1 - i) * mockTsBlockSize,
+          (2 * numOfMockTsBlock - 1 - i) * MOCK_TSBLOCK_SIZE,
           sourceHandle.getBufferRetainedSizeInBytes());
     }
 
     // New data blocks event arrived.
     sourceHandle.updatePendingDataBlockInfo(
         numOfMockTsBlock * 2,
-        Stream.generate(() -> mockTsBlockSize)
+        Stream.generate(() -> MOCK_TSBLOCK_SIZE)
             .limit(numOfMockTsBlock)
             .collect(Collectors.toList()));
     try {
@@ -466,7 +486,7 @@ public class SourceHandleTest {
     Assert.assertFalse(sourceHandle.isAborted());
     Assert.assertFalse(sourceHandle.isFinished());
     Assert.assertEquals(
-        numOfMockTsBlock * mockTsBlockSize, 
sourceHandle.getBufferRetainedSizeInBytes());
+        numOfMockTsBlock * MOCK_TSBLOCK_SIZE, 
sourceHandle.getBufferRetainedSizeInBytes());
 
     // The local fragment instance consumes the data blocks.
     for (int i = 0; i < numOfMockTsBlock; i++) {
@@ -479,7 +499,7 @@ public class SourceHandleTest {
       Assert.assertFalse(sourceHandle.isAborted());
       Assert.assertFalse(sourceHandle.isFinished());
       Assert.assertEquals(
-          (numOfMockTsBlock - 1 - i) * mockTsBlockSize,
+          (numOfMockTsBlock - 1 - i) * MOCK_TSBLOCK_SIZE,
           sourceHandle.getBufferRetainedSizeInBytes());
     }
 
@@ -496,7 +516,7 @@ public class SourceHandleTest {
   @Test
   public void testFailedReceive() {
     final String queryId = "q0";
-    final long mockTsBlockSize = 1024L * 1024L;
+    final long MOCK_TSBLOCK_SIZE = 1024L * 1024L;
     final int numOfMockTsBlock = 10;
     final TEndPoint remoteEndpoint =
         new TEndPoint("remote", 
IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
@@ -511,7 +531,7 @@ public class SourceHandleTest {
     // Construct a mock SourceHandleListener.
     SourceHandleListener mockSourceHandleListener = 
Mockito.mock(SourceHandleListener.class);
     // Construct a mock TsBlockSerde that deserializes any bytebuffer into a 
mock TsBlock.
-    TsBlockSerde mockTsBlockSerde = 
Utils.createMockTsBlockSerde(mockTsBlockSize);
+    TsBlockSerde mockTsBlockSerde = 
Utils.createMockTsBlockSerde(MOCK_TSBLOCK_SIZE);
     IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> 
mockClientManager =
         Mockito.mock(IClientManager.class);
     // Construct a mock client.
@@ -549,7 +569,7 @@ public class SourceHandleTest {
     // New data blocks event arrived.
     sourceHandle.updatePendingDataBlockInfo(
         0,
-        Stream.generate(() -> mockTsBlockSize)
+        Stream.generate(() -> MOCK_TSBLOCK_SIZE)
             .limit(numOfMockTsBlock)
             .collect(Collectors.toList()));
     try {
@@ -576,7 +596,7 @@ public class SourceHandleTest {
   @Test
   public void testForceClose() {
     final String queryId = "q0";
-    final long mockTsBlockSize = 1024L * 1024L;
+    final long MOCK_TSBLOCK_SIZE = 1024L * 1024L;
     final TEndPoint remoteEndpoint =
         new TEndPoint("remote", 
IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
     final TFragmentInstanceId remoteFragmentInstanceId = new 
TFragmentInstanceId(queryId, 1, "0");
@@ -613,7 +633,7 @@ public class SourceHandleTest {
     // Construct a mock SourceHandleListener.
     SourceHandleListener mockSourceHandleListener = 
Mockito.mock(SourceHandleListener.class);
     // Construct a mock TsBlockSerde that deserializes any bytebuffer into a 
mock TsBlock.
-    TsBlockSerde mockTsBlockSerde = 
Utils.createMockTsBlockSerde(mockTsBlockSize);
+    TsBlockSerde mockTsBlockSerde = 
Utils.createMockTsBlockSerde(MOCK_TSBLOCK_SIZE);
 
     SourceHandle sourceHandle =
         new SourceHandle(

Reply via email to