This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5ad3384f0f Change config in SourceHandleTest according to
IoTDBDescriptor
5ad3384f0f is described below
commit 5ad3384f0f37fc1982a823c714377dcf090da931
Author: Liao Lanyu <[email protected]>
AuthorDate: Tue Jan 10 16:44:00 2023 +0800
Change config in SourceHandleTest according to IoTDBDescriptor
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ++-
.../mpp/execution/exchange/SourceHandleTest.java | 82 ++++++++++++++--------
2 files changed, 61 insertions(+), 32 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 0268ff12a5..acafb58bb7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.conf;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import
org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.audit.AuditLogOperation;
import org.apache.iotdb.db.audit.AuditLogStorage;
@@ -512,6 +513,9 @@ public class IoTDBConfig {
/** Memory allocated for operators */
private long allocateMemoryForDataExchange = allocateMemoryForRead * 200 /
1001;
+ /** Max bytes of each FragmentInstance for DataExchange */
+ private long maxBytesPerFragmentInstance = allocateMemoryForDataExchange /
queryThreadCount;
+
/** Memory allocated proportion for timeIndex */
private long allocateMemoryForTimeIndex = allocateMemoryForRead * 200 / 1001;
@@ -1529,7 +1533,12 @@ public class IoTDBConfig {
}
public long getMaxBytesPerFragmentInstance() {
- return allocateMemoryForDataExchange / queryThreadCount;
+ return maxBytesPerFragmentInstance;
+ }
+
+ @TestOnly
+ public void setMaxBytesPerFragmentInstance(long maxBytesPerFragmentInstance)
{
+ this.maxBytesPerFragmentInstance = maxBytesPerFragmentInstance;
}
public int getRawQueryBlockingQueueCapacity() {
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
index aa9adbace6..1a5ad7560f 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
@@ -34,7 +34,9 @@ import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockResponse;
import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
import org.apache.thrift.TException;
+import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
@@ -47,10 +49,24 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
public class SourceHandleTest {
+ private static final long MOCK_TSBLOCK_SIZE = 1024L * 1024L;
+
+ private static long maxBytesPerFI;
+
+ @BeforeClass
+ public static void beforeClass() {
+ maxBytesPerFI =
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
+ IoTDBDescriptor.getInstance().getConfig().setMaxBytesPerFragmentInstance(5
* MOCK_TSBLOCK_SIZE);
+ }
+
+ @AfterClass
+ public static void afterClass() {
+
IoTDBDescriptor.getInstance().getConfig().setMaxBytesPerFragmentInstance(maxBytesPerFI);
+ }
+
@Test
public void testNonBlockedOneTimeReceive() {
final String queryId = "q0";
- final long mockTsBlockSize = 1024L * 1024L;
final int numOfMockTsBlock = 10;
final TEndPoint remoteEndpoint =
new TEndPoint("remote",
IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
@@ -88,7 +104,7 @@ public class SourceHandleTest {
// Construct a mock SourceHandleListener.
SourceHandleListener mockSourceHandleListener =
Mockito.mock(SourceHandleListener.class);
// Construct a mock TsBlockSerde that deserializes any bytebuffer into a
mock TsBlock.
- TsBlockSerde mockTsBlockSerde =
Utils.createMockTsBlockSerde(mockTsBlockSize);
+ TsBlockSerde mockTsBlockSerde =
Utils.createMockTsBlockSerde(MOCK_TSBLOCK_SIZE);
SourceHandle sourceHandle =
new SourceHandle(
@@ -109,7 +125,7 @@ public class SourceHandleTest {
// New data blocks event arrived.
sourceHandle.updatePendingDataBlockInfo(
0,
- Stream.generate(() -> mockTsBlockSize)
+ Stream.generate(() -> MOCK_TSBLOCK_SIZE)
.limit(numOfMockTsBlock)
.collect(Collectors.toList()));
try {
@@ -135,7 +151,7 @@ public class SourceHandleTest {
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(
- numOfMockTsBlock * mockTsBlockSize,
sourceHandle.getBufferRetainedSizeInBytes());
+ numOfMockTsBlock * MOCK_TSBLOCK_SIZE,
sourceHandle.getBufferRetainedSizeInBytes());
// The local fragment instance consumes the data blocks.
for (int i = 0; i < numOfMockTsBlock; i++) {
@@ -148,7 +164,7 @@ public class SourceHandleTest {
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(
- (numOfMockTsBlock - 1 - i) * mockTsBlockSize,
+ (numOfMockTsBlock - 1 - i) * MOCK_TSBLOCK_SIZE,
sourceHandle.getBufferRetainedSizeInBytes());
}
@@ -165,7 +181,6 @@ public class SourceHandleTest {
@Test
public void testBlockedOneTimeReceive() {
final String queryId = "q0";
- final long mockTsBlockSize = 1024L * 1024L;
final int numOfMockTsBlock = 10;
final TEndPoint remoteEndpoint =
new TEndPoint("remote",
IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
@@ -173,10 +188,10 @@ public class SourceHandleTest {
final String localPlanNodeId = "exchange_0";
final TFragmentInstanceId localFragmentInstanceId = new
TFragmentInstanceId(queryId, 0, "0");
- // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize
per query.
+ // Construct a mock LocalMemoryManager with capacity 5 * MOCK_TSBLOCK_SIZE
per query.
LocalMemoryManager mockLocalMemoryManager =
Mockito.mock(LocalMemoryManager.class);
MemoryPool spyMemoryPool =
- Mockito.spy(new MemoryPool("test", 10 * mockTsBlockSize, 5 *
mockTsBlockSize));
+ Mockito.spy(new MemoryPool("test", 10 * MOCK_TSBLOCK_SIZE, 5 *
MOCK_TSBLOCK_SIZE));
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
mockClientManager =
Mockito.mock(IClientManager.class);
@@ -204,7 +219,7 @@ public class SourceHandleTest {
// Construct a mock SourceHandleListener.
SourceHandleListener mockSourceHandleListener =
Mockito.mock(SourceHandleListener.class);
// Construct a mock TsBlockSerde that deserializes any bytebuffer into a
mock TsBlock.
- TsBlockSerde mockTsBlockSerde =
Utils.createMockTsBlockSerde(mockTsBlockSize);
+ TsBlockSerde mockTsBlockSerde =
Utils.createMockTsBlockSerde(MOCK_TSBLOCK_SIZE);
SourceHandle sourceHandle =
new SourceHandle(
@@ -217,7 +232,11 @@ public class SourceHandleTest {
mockTsBlockSerde,
mockSourceHandleListener,
mockClientManager);
- sourceHandle.setMaxBytesCanReserve(5 * mockTsBlockSize);
+ long maxBytesCanReserve =
+ Math.min(
+ 5 * MOCK_TSBLOCK_SIZE,
+
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance());
+ sourceHandle.setMaxBytesCanReserve(maxBytesCanReserve);
Assert.assertFalse(sourceHandle.isBlocked().isDone());
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
@@ -226,7 +245,7 @@ public class SourceHandleTest {
// New data blocks event arrived.
sourceHandle.updatePendingDataBlockInfo(
0,
- Stream.generate(() -> mockTsBlockSize)
+ Stream.generate(() -> MOCK_TSBLOCK_SIZE)
.limit(numOfMockTsBlock)
.collect(Collectors.toList()));
try {
@@ -235,8 +254,8 @@ public class SourceHandleTest {
queryId,
localFragmentInstanceId.getInstanceId(),
localPlanNodeId,
- mockTsBlockSize,
- 5 * mockTsBlockSize);
+ MOCK_TSBLOCK_SIZE,
+ maxBytesCanReserve);
Mockito.verify(mockClient, Mockito.timeout(10_0000).times(1))
.getDataBlock(
Mockito.argThat(
@@ -258,17 +277,18 @@ public class SourceHandleTest {
Assert.assertTrue(sourceHandle.isBlocked().isDone());
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
- Assert.assertEquals(6 * mockTsBlockSize,
sourceHandle.getBufferRetainedSizeInBytes());
+ Assert.assertEquals(6 * MOCK_TSBLOCK_SIZE,
sourceHandle.getBufferRetainedSizeInBytes());
// The local fragment instance consumes the data blocks.
for (int i = 0; i < numOfMockTsBlock; i++) {
Mockito.verify(spyMemoryPool, Mockito.timeout(10_0000).times(i))
- .free(queryId, localFragmentInstanceId.getInstanceId(),
localPlanNodeId, mockTsBlockSize);
+ .free(
+ queryId, localFragmentInstanceId.getInstanceId(),
localPlanNodeId, MOCK_TSBLOCK_SIZE);
sourceHandle.receive();
try {
if (i < 5) {
Assert.assertEquals(
- i == 4 ? 5 * mockTsBlockSize : 6 * mockTsBlockSize,
+ i == 4 ? 5 * MOCK_TSBLOCK_SIZE : 6 * MOCK_TSBLOCK_SIZE,
sourceHandle.getBufferRetainedSizeInBytes());
final int startSequenceId = 5 + i;
Mockito.verify(mockClient, Mockito.timeout(10_000).times(1))
@@ -287,7 +307,7 @@ public class SourceHandleTest {
&& startSequenceId + 1 == e.getEndSequenceId()));
} else {
Assert.assertEquals(
- (numOfMockTsBlock - 1 - i) * mockTsBlockSize,
+ (numOfMockTsBlock - 1 - i) * MOCK_TSBLOCK_SIZE,
sourceHandle.getBufferRetainedSizeInBytes());
}
} catch (TException e) {
@@ -316,7 +336,7 @@ public class SourceHandleTest {
@Test
public void testMultiTimesReceive() {
final String queryId = "q0";
- final long mockTsBlockSize = 1024L * 1024L;
+ final long MOCK_TSBLOCK_SIZE = 1024L * 1024L;
final int numOfMockTsBlock = 10;
final TEndPoint remoteEndpoint =
new TEndPoint("remote",
IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
@@ -331,7 +351,7 @@ public class SourceHandleTest {
// Construct a mock SourceHandleListener.
SourceHandleListener mockSourceHandleListener =
Mockito.mock(SourceHandleListener.class);
// Construct a mock TsBlockSerde that deserializes any bytebuffer into a
mock TsBlock.
- TsBlockSerde mockTsBlockSerde =
Utils.createMockTsBlockSerde(mockTsBlockSize);
+ TsBlockSerde mockTsBlockSerde =
Utils.createMockTsBlockSerde(MOCK_TSBLOCK_SIZE);
IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
mockClientManager =
Mockito.mock(IClientManager.class);
// Construct a mock client.
@@ -375,7 +395,7 @@ public class SourceHandleTest {
// New data blocks event arrived in unordered manner.
sourceHandle.updatePendingDataBlockInfo(
numOfMockTsBlock,
- Stream.generate(() -> mockTsBlockSize)
+ Stream.generate(() -> MOCK_TSBLOCK_SIZE)
.limit(numOfMockTsBlock)
.collect(Collectors.toList()));
try {
@@ -394,7 +414,7 @@ public class SourceHandleTest {
sourceHandle.updatePendingDataBlockInfo(
0,
- Stream.generate(() -> mockTsBlockSize)
+ Stream.generate(() -> MOCK_TSBLOCK_SIZE)
.limit(numOfMockTsBlock)
.collect(Collectors.toList()));
try {
@@ -420,7 +440,7 @@ public class SourceHandleTest {
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(
- numOfMockTsBlock * 2 * mockTsBlockSize,
sourceHandle.getBufferRetainedSizeInBytes());
+ numOfMockTsBlock * 2 * MOCK_TSBLOCK_SIZE,
sourceHandle.getBufferRetainedSizeInBytes());
// The local fragment instance consumes the data blocks.
for (int i = 0; i < 2 * numOfMockTsBlock; i++) {
@@ -433,14 +453,14 @@ public class SourceHandleTest {
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(
- (2 * numOfMockTsBlock - 1 - i) * mockTsBlockSize,
+ (2 * numOfMockTsBlock - 1 - i) * MOCK_TSBLOCK_SIZE,
sourceHandle.getBufferRetainedSizeInBytes());
}
// New data blocks event arrived.
sourceHandle.updatePendingDataBlockInfo(
numOfMockTsBlock * 2,
- Stream.generate(() -> mockTsBlockSize)
+ Stream.generate(() -> MOCK_TSBLOCK_SIZE)
.limit(numOfMockTsBlock)
.collect(Collectors.toList()));
try {
@@ -466,7 +486,7 @@ public class SourceHandleTest {
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(
- numOfMockTsBlock * mockTsBlockSize,
sourceHandle.getBufferRetainedSizeInBytes());
+ numOfMockTsBlock * MOCK_TSBLOCK_SIZE,
sourceHandle.getBufferRetainedSizeInBytes());
// The local fragment instance consumes the data blocks.
for (int i = 0; i < numOfMockTsBlock; i++) {
@@ -479,7 +499,7 @@ public class SourceHandleTest {
Assert.assertFalse(sourceHandle.isAborted());
Assert.assertFalse(sourceHandle.isFinished());
Assert.assertEquals(
- (numOfMockTsBlock - 1 - i) * mockTsBlockSize,
+ (numOfMockTsBlock - 1 - i) * MOCK_TSBLOCK_SIZE,
sourceHandle.getBufferRetainedSizeInBytes());
}
@@ -496,7 +516,7 @@ public class SourceHandleTest {
@Test
public void testFailedReceive() {
final String queryId = "q0";
- final long mockTsBlockSize = 1024L * 1024L;
+ final long MOCK_TSBLOCK_SIZE = 1024L * 1024L;
final int numOfMockTsBlock = 10;
final TEndPoint remoteEndpoint =
new TEndPoint("remote",
IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
@@ -511,7 +531,7 @@ public class SourceHandleTest {
// Construct a mock SourceHandleListener.
SourceHandleListener mockSourceHandleListener =
Mockito.mock(SourceHandleListener.class);
// Construct a mock TsBlockSerde that deserializes any bytebuffer into a
mock TsBlock.
- TsBlockSerde mockTsBlockSerde =
Utils.createMockTsBlockSerde(mockTsBlockSize);
+ TsBlockSerde mockTsBlockSerde =
Utils.createMockTsBlockSerde(MOCK_TSBLOCK_SIZE);
IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
mockClientManager =
Mockito.mock(IClientManager.class);
// Construct a mock client.
@@ -549,7 +569,7 @@ public class SourceHandleTest {
// New data blocks event arrived.
sourceHandle.updatePendingDataBlockInfo(
0,
- Stream.generate(() -> mockTsBlockSize)
+ Stream.generate(() -> MOCK_TSBLOCK_SIZE)
.limit(numOfMockTsBlock)
.collect(Collectors.toList()));
try {
@@ -576,7 +596,7 @@ public class SourceHandleTest {
@Test
public void testForceClose() {
final String queryId = "q0";
- final long mockTsBlockSize = 1024L * 1024L;
+ final long MOCK_TSBLOCK_SIZE = 1024L * 1024L;
final TEndPoint remoteEndpoint =
new TEndPoint("remote",
IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
final TFragmentInstanceId remoteFragmentInstanceId = new
TFragmentInstanceId(queryId, 1, "0");
@@ -613,7 +633,7 @@ public class SourceHandleTest {
// Construct a mock SourceHandleListener.
SourceHandleListener mockSourceHandleListener =
Mockito.mock(SourceHandleListener.class);
// Construct a mock TsBlockSerde that deserializes any bytebuffer into a
mock TsBlock.
- TsBlockSerde mockTsBlockSerde =
Utils.createMockTsBlockSerde(mockTsBlockSize);
+ TsBlockSerde mockTsBlockSerde =
Utils.createMockTsBlockSerde(MOCK_TSBLOCK_SIZE);
SourceHandle sourceHandle =
new SourceHandle(