This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch concurrentBug in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 16917f89ea1736bf0204d06e7b1b67c86dcc481c Author: Alima777 <[email protected]> AuthorDate: Thu Mar 23 13:03:53 2023 +0800 Reduce the scope of lock in MemoryPool --- .../db/exception/runtime/MemoryLeakException.java | 27 +++++++ .../iotdb/db/mpp/common/FragmentInstanceId.java | 4 + .../execution/exchange/MPPDataExchangeManager.java | 6 ++ .../mpp/execution/exchange/SharedTsBlockQueue.java | 15 ++-- .../mpp/execution/exchange/sink/SinkChannel.java | 8 -- .../execution/exchange/source/SourceHandle.java | 8 -- .../fragment/FragmentInstanceExecution.java | 5 ++ .../iotdb/db/mpp/execution/memory/MemoryPool.java | 93 +++++++++++----------- 8 files changed, 92 insertions(+), 74 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/exception/runtime/MemoryLeakException.java b/server/src/main/java/org/apache/iotdb/db/exception/runtime/MemoryLeakException.java new file mode 100644 index 0000000000..beb9a83951 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/exception/runtime/MemoryLeakException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.exception.runtime; + +public class MemoryLeakException extends RuntimeException { + + public MemoryLeakException(String message) { + super(message); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java index 793066b94a..24164c4e89 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java @@ -57,6 +57,10 @@ public class FragmentInstanceId { return instanceId; } + public String getFragmentInstanceId() { + return fragmentId + "." + instanceId; + } + public String toString() { return fullId; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java index 501781b408..9a58904c79 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java @@ -470,6 +470,12 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { return mppDataExchangeService; } + public void deRegisterFragmentInstanceFromMemoryPool(String queryId, String fragmentInstanceId) { + localMemoryManager + .getQueryPool() + .deRegisterFragmentInstanceToQueryMemoryMap(queryId, fragmentInstanceId); + } + private synchronized ISinkChannel createLocalSinkChannel( TFragmentInstanceId localFragmentInstanceId, TFragmentInstanceId remoteFragmentInstanceId, diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java index 53d668cc86..e496ac98e2 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java @@ -30,12 +30,11 @@ import org.apache.iotdb.tsfile.utils.Pair; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import javax.annotation.concurrent.NotThreadSafe; import org.apache.commons.lang3.Validate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.concurrent.NotThreadSafe; - import java.util.LinkedList; import java.util.Queue; @@ -91,6 +90,10 @@ public class SharedTsBlockQueue { this.localPlanNodeId = Validate.notNull(planNodeId, "PlanNode ID cannot be null"); this.localMemoryManager = Validate.notNull(localMemoryManager, "local memory manager cannot be null"); + localMemoryManager + .getQueryPool() + .registerPlanNodeIdToQueryMemoryMap( + fragmentInstanceId.queryId, fullFragmentInstanceId, planNodeId); } public boolean hasNoMoreTsBlocks() { @@ -260,10 +263,6 @@ public class SharedTsBlockQueue { bufferRetainedSizeInBytes); bufferRetainedSizeInBytes = 0; } - localMemoryManager - .getQueryPool() - .clearMemoryReservationMap( - localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId); } /** Destroy the queue and cancel the future. Should only be called in abnormal case */ @@ -289,10 +288,6 @@ public class SharedTsBlockQueue { bufferRetainedSizeInBytes); bufferRetainedSizeInBytes = 0; } - localMemoryManager - .getQueryPool() - .clearMemoryReservationMap( - localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId); } /** Destroy the queue and cancel the future. Should only be called in abnormal case */ diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java index 5cd28de462..b316e2bbd4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java @@ -222,10 +222,6 @@ public class SinkChannel implements ISinkChannel { bufferRetainedSizeInBytes); bufferRetainedSizeInBytes = 0; } - localMemoryManager - .getQueryPool() - .clearMemoryReservationMap( - localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId); sinkListener.onAborted(this); aborted = true; LOGGER.debug("[EndAbortSinkChannel]"); @@ -249,10 +245,6 @@ public class SinkChannel implements ISinkChannel { bufferRetainedSizeInBytes); bufferRetainedSizeInBytes = 0; } - localMemoryManager - .getQueryPool() - .clearMemoryReservationMap( - localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId); sinkListener.onFinish(this); closed = true; LOGGER.debug("[EndCloseSinkChannel]"); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java index b66dd4bce4..5d118d310c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java @@ -332,10 +332,6 @@ public class SourceHandle implements ISourceHandle { bufferRetainedSizeInBytes); bufferRetainedSizeInBytes = 0; } - localMemoryManager - .getQueryPool() - .clearMemoryReservationMap( - localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId); aborted = true; sourceHandleListener.onAborted(this); } @@ -369,10 +365,6 @@ public class SourceHandle implements ISourceHandle { bufferRetainedSizeInBytes); bufferRetainedSizeInBytes = 0; } - localMemoryManager - .getQueryPool() - .clearMemoryReservationMap( - localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId); closed = true; executorService.submit(new SendCloseSinkChannelEventTask()); currSequenceId = lastSequenceId + 1; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java index f9c17c9963..b402f0892e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.fragment; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.execution.driver.IDriver; +import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService; import org.apache.iotdb.db.mpp.execution.exchange.sink.ISink; import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler; import org.apache.iotdb.db.utils.SetThreadName; @@ -137,6 +138,10 @@ public class FragmentInstanceExecution { context.releaseResource(); // help for gc drivers = null; + MPPDataExchangeService.getInstance() + .getMPPDataExchangeManager() + .deRegisterFragmentInstanceFromMemoryPool( + instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId()); if (newState.isFailed()) { scheduler.abortFragmentInstance(instanceId); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java index 37dfc13d9a..a9acf90e43 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java @@ -20,19 +20,18 @@ package org.apache.iotdb.db.mpp.execution.memory; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.exception.runtime.MemoryLeakException; import org.apache.iotdb.tsfile.utils.Pair; import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import javax.annotation.Nullable; import org.apache.commons.lang3.Validate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; - import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -149,6 +148,44 @@ public class MemoryPool { return id; } + /** + * Before executing, we register memory map which is related to queryId, fragmentInstanceId, and + * planNodeId to queryMemoryReservationsMap first. + */ + public synchronized void registerPlanNodeIdToQueryMemoryMap( + String queryId, String fragmentInstanceId, String planNodeId) { + queryMemoryReservations + .computeIfAbsent(queryId, x -> new ConcurrentHashMap<>()) + .computeIfAbsent(fragmentInstanceId, x -> new ConcurrentHashMap<>()) + .putIfAbsent(planNodeId, 0L); + } + + /** + * If all fragmentInstanceIds related to one queryId have been registered, when the last fragment + * instance is deregister, the queryId can be cleared. + * + * <p>If some fragmentInstanceIds have not been registered when queryId is cleared, they will + * register queryId again with lock, so there is no concurrency problem. + */ + public void deRegisterFragmentInstanceToQueryMemoryMap( + String queryId, String fragmentInstanceId) { + Map<String, Long> planNodeRelatedMemory = + queryMemoryReservations.get(queryId).get(fragmentInstanceId); + for (Long memoryReserved : planNodeRelatedMemory.values()) { + if (memoryReserved != 0) { + throw new MemoryLeakException( + "PlanNode related memory is not zero when deregister fragment instance from query memory pool."); + } + } + synchronized (queryMemoryReservations) { + Map<String, Map<String, Long>> queryRelatedMemory = queryMemoryReservations.get(queryId); + queryRelatedMemory.remove(fragmentInstanceId); + if (queryRelatedMemory.isEmpty()) { + queryMemoryReservations.remove(queryId); + } + } + } + /** * Reserve memory with bytesToReserve. * @@ -338,47 +375,7 @@ public class MemoryPool { return maxBytes - remainingBytes.get(); } - public void clearMemoryReservationMap( - String queryId, String fragmentInstanceId, String planNodeId) { - Map<String, Map<String, Long>> instanceBytesReserved = queryMemoryReservations.get(queryId); - Map<String, Long> planNodeIdToBytesReserved = - queryMemoryReservations - .getOrDefault(queryId, Collections.emptyMap()) - .get(fragmentInstanceId); - if (instanceBytesReserved == null || planNodeIdToBytesReserved == null) { - return; - } - - Long newValue = - planNodeIdToBytesReserved.computeIfPresent( - planNodeId, - (k, memoryReserved) -> { - if (memoryReserved == 0) { - return null; - } - return memoryReserved; - }); - if (newValue == null) { - instanceBytesReserved.computeIfPresent( - fragmentInstanceId, - (k, kPlanNodeBytesReserved) -> { - if (kPlanNodeBytesReserved.isEmpty()) { - return null; - } - return kPlanNodeBytesReserved; - }); - queryMemoryReservations.computeIfPresent( - queryId, - (k, kInstanceBytesReserved) -> { - if (kInstanceBytesReserved.isEmpty()) { - return null; - } - return kInstanceBytesReserved; - }); - } - } - - private boolean tryReserve( + public boolean tryReserve( String queryId, String fragmentInstanceId, String planNodeId, @@ -388,8 +385,8 @@ public class MemoryPool { long queryRemainingBytes = maxBytesCanReserve - queryMemoryReservations - .computeIfAbsent(queryId, x -> new ConcurrentHashMap<>()) - .computeIfAbsent(fragmentInstanceId, x -> new ConcurrentHashMap<>()) + .get(queryId) + .get(fragmentInstanceId) .merge(planNodeId, bytesToReserve, Long::sum); return tryRemainingBytes >= 0 && queryRemainingBytes >= 0; } @@ -397,8 +394,8 @@ public class MemoryPool { private void rollbackReserve( String queryId, String fragmentInstanceId, String planNodeId, long bytesToReserve) { queryMemoryReservations - .computeIfAbsent(queryId, x -> new ConcurrentHashMap<>()) - .computeIfAbsent(fragmentInstanceId, x -> new ConcurrentHashMap<>()) + .get(queryId) + .get(fragmentInstanceId) .merge(planNodeId, -bytesToReserve, Long::sum); remainingBytes.addAndGet(bytesToReserve); }
