This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch add_log_event in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4f80e0bd2f58c1e98260c09e2b61ba9fc7cd7b42 Author: qiaojialin <[email protected]> AuthorDate: Wed Sep 14 11:12:09 2022 +0800 add event name in log --- .../db/mpp/execution/exchange/LocalSinkHandle.java | 15 ++++++----- .../mpp/execution/exchange/LocalSourceHandle.java | 12 +++++---- .../execution/exchange/MPPDataExchangeManager.java | 29 +++++++++++----------- .../mpp/execution/exchange/SharedTsBlockQueue.java | 2 +- .../db/mpp/execution/exchange/SinkHandle.java | 21 ++++++---------- .../db/mpp/execution/exchange/SourceHandle.java | 15 +++++------ .../fragment/FragmentInstanceStateMachine.java | 2 +- .../db/mpp/plan/planner/LocalExecutionPlanner.java | 4 +-- 8 files changed, 49 insertions(+), 51 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java index aaba4b8d0b..1f70ff4460 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java @@ -113,7 +113,7 @@ public class LocalSinkHandle implements ISinkHandle { if (queue.hasNoMoreTsBlocks()) { return; } - logger.info("send TsBlocks"); + logger.info("[StartSendTsBlockOnLocal]"); synchronized (this) { blocked = queue.add(tsBlock); } @@ -129,9 +129,8 @@ public class LocalSinkHandle implements ISinkHandle { public void setNoMoreTsBlocks() { synchronized (queue) { synchronized (this) { - logger.info("set noMoreTsBlocks."); + logger.info("[StartSetNoMoreTsBlocksOnLocal]"); if (aborted || closed) { - logger.info("SinkHandle has been aborted={} or closed={}.", aborted, closed); return; } queue.setNoMoreTsBlocks(true); @@ -139,12 +138,12 @@ public class LocalSinkHandle implements ISinkHandle { } } checkAndInvokeOnFinished(); - logger.info("noMoreTsBlocks has been set."); + logger.info("[EndSetNoMoreTsBlocksOnLocal]"); } @Override public void abort() { - logger.info("Sink handle is being aborted."); + logger.info("[StartAbortLocalSinkHandle]"); synchronized (queue) { synchronized (this) { if (aborted || closed) { @@ -155,12 +154,12 @@ public class LocalSinkHandle implements ISinkHandle { sinkHandleListener.onAborted(this); } } - logger.info("Sink handle is aborted"); + logger.info("[EndAbortLocalSinkHandle]"); } @Override public void close() { - logger.info("Sink handle is being closed."); + logger.info("[StartCloseLocalSinkHandle]"); synchronized (queue) { synchronized (this) { if (aborted || closed) { @@ -171,7 +170,7 @@ public class LocalSinkHandle implements ISinkHandle { sinkHandleListener.onFinish(this); } } - logger.info("Sink handle is closed"); + logger.info("[EndCloseLocalSinkHandle]"); } public TFragmentInstanceId getRemoteFragmentInstanceId() { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java index 35c7cda823..c8182acf8d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java @@ -95,7 +95,9 @@ public class LocalSourceHandle implements ISourceHandle { if (tsBlock != null) { currSequenceId++; logger.info( - "Receive {} TsdBlock, size is {}", currSequenceId, tsBlock.getRetainedSizeInBytes()); + "[GetTsBlockFromQueue] TsBlock:{} size:{}", + currSequenceId, + tsBlock.getRetainedSizeInBytes()); } checkAndInvokeOnFinished(); return tsBlock; @@ -139,7 +141,7 @@ public class LocalSourceHandle implements ISourceHandle { return; } try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { - logger.info("Source handle is being aborted."); + logger.info("[StartAbortLocalSourceHandle]"); synchronized (queue) { synchronized (this) { if (aborted || closed) { @@ -150,7 +152,7 @@ public class LocalSourceHandle implements ISourceHandle { sourceHandleListener.onAborted(this); } } - logger.info("Source handle is aborted"); + logger.info("[EndAbortLocalSourceHandle]"); } } @@ -160,7 +162,7 @@ public class LocalSourceHandle implements ISourceHandle { return; } try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { - logger.info("Source handle is being closed."); + logger.info("[StartCloseLocalSourceHandle]"); synchronized (queue) { synchronized (this) { if (aborted || closed) { @@ -171,7 +173,7 @@ public class LocalSourceHandle implements ISourceHandle { sourceHandleListener.onFinished(this); } } - logger.info("Source handle is closed"); + logger.info("[EndCloseLocalSourceHandle]"); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java index e14672ec88..c9634a75a9 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java @@ -80,10 +80,9 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { try (SetThreadName fragmentInstanceName = new SetThreadName(createFullIdFrom(req.sourceFragmentInstanceId, "SinkHandle"))) { logger.debug( - "Get data block request received, for data blocks whose sequence ID in [{}, {}) from {}.", + "[ProcessGetTsBlockRequest] sequence ID in [{}, {})", req.getStartSequenceId(), - req.getEndSequenceId(), - req.getSourceFragmentInstanceId()); + req.getEndSequenceId()); if (!sinkHandles.containsKey(req.getSourceFragmentInstanceId())) { throw new TException( "Source fragment instance not found. Fragment instance ID: " @@ -215,12 +214,12 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { @Override public void onFinished(ISourceHandle sourceHandle) { - logger.info("finished and release resources"); + logger.info("[ScHListenerOnFinish]"); if (!sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId()) || !sourceHandles .get(sourceHandle.getLocalFragmentInstanceId()) .containsKey(sourceHandle.getLocalPlanNodeId())) { - logger.info("resources has already been released"); + logger.warn("[ScHListenerAlreadyReleased]"); } else { sourceHandles .get(sourceHandle.getLocalFragmentInstanceId()) @@ -234,7 +233,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { @Override public void onAborted(ISourceHandle sourceHandle) { - logger.info("onAborted is invoked"); + logger.info("[ScHListenerOnAbort]"); onFinished(sourceHandle); } @@ -262,28 +261,29 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { @Override public void onFinish(ISinkHandle sinkHandle) { - logger.info("onFinish is invoked"); + logger.info("[SkHListenerOnFinish]"); removeFromMPPDataExchangeManager(sinkHandle); context.finished(); } @Override public void onEndOfBlocks(ISinkHandle sinkHandle) { + logger.info("[SkHListenerOnEndOfTsBlocks]"); context.transitionToFlushing(); } @Override public void onAborted(ISinkHandle sinkHandle) { - logger.info("onAborted is invoked"); + logger.info("[SkHListenerOnAbort]"); removeFromMPPDataExchangeManager(sinkHandle); } private void removeFromMPPDataExchangeManager(ISinkHandle sinkHandle) { - logger.info("release resources of finished sink handle"); - if (!sinkHandles.containsKey(sinkHandle.getLocalFragmentInstanceId())) { - logger.info("resources already been released"); + if (sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId()) == null) { + logger.warn("[RemoveNoSinkHandle]"); + } else { + logger.info("[RemoveSinkHandle]"); } - sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId()); } @Override @@ -493,7 +493,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { * <p>This method should be called when a fragment instance finished in an abnormal state. */ public void forceDeregisterFragmentInstance(TFragmentInstanceId fragmentInstanceId) { - logger.info("Force deregister fragment instance"); + logger.info("[StartForceReleaseFIDataExchangeResource]"); if (sinkHandles.containsKey(fragmentInstanceId)) { ISinkHandle sinkHandle = sinkHandles.get(fragmentInstanceId); sinkHandle.abort(); @@ -502,11 +502,12 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { if (sourceHandles.containsKey(fragmentInstanceId)) { Map<String, ISourceHandle> planNodeIdToSourceHandle = sourceHandles.get(fragmentInstanceId); for (Entry<String, ISourceHandle> entry : planNodeIdToSourceHandle.entrySet()) { - logger.info("Close source handle {}", sourceHandles); + logger.info("[CloseSourceHandle] {}", entry.getKey()); entry.getValue().abort(); } sourceHandles.remove(fragmentInstanceId); } + logger.info("[EndForceReleaseFIDataExchangeResource]"); } /** @param suffix should be like [PlanNodeId].SourceHandle/SinHandle */ diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java index 1222ee5ef0..ed044814a2 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java @@ -96,7 +96,7 @@ public class SharedTsBlockQueue { /** Notify no more tsblocks will be added to the queue. */ public void setNoMoreTsBlocks(boolean noMoreTsBlocks) { - logger.info("SharedTsBlockQueue receive no more TsBlocks signal."); + logger.info("[SignalNoMoreTsBlockOnQueue]"); if (closed) { logger.warn("queue has been destroyed"); return; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java index 703a04d760..63fb42784c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java @@ -167,7 +167,7 @@ public class SinkHandle implements ISinkHandle { } private void sendEndOfDataBlockEvent() throws Exception { - logger.info("send end of data block event"); + logger.info("[NotifyNoMoreTsBlock]"); int attempt = 0; TEndOfDataBlockEvent endOfDataBlockEvent = new TEndOfDataBlockEvent( @@ -193,7 +193,7 @@ public class SinkHandle implements ISinkHandle { @Override public synchronized void setNoMoreTsBlocks() { - logger.info("start to set no-more-tsblocks"); + logger.info("[StartSetNoMoreTsBlocks]"); if (aborted || closed) { return; } @@ -202,20 +202,17 @@ public class SinkHandle implements ISinkHandle { } catch (Exception e) { throw new RuntimeException("Send EndOfDataBlockEvent failed", e); } - logger.info("set noMoreTsBlocks to true"); noMoreTsBlocks = true; if (isFinished()) { - logger.info("revoke onFinish() of sinkHandleListener"); sinkHandleListener.onFinish(this); } - logger.info("revoke onEndOfBlocks() of sinkHandleListener"); sinkHandleListener.onEndOfBlocks(this); } @Override public synchronized void abort() { - logger.info("SinkHandle is being aborted."); + logger.info("[StartAbortSinkHandle]"); sequenceIdToTsBlock.clear(); aborted = true; bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blocked); @@ -226,12 +223,12 @@ public class SinkHandle implements ISinkHandle { bufferRetainedSizeInBytes = 0; } sinkHandleListener.onAborted(this); - logger.info("SinkHandle is aborted"); + logger.info("[EndAbortSinkHandle]"); } @Override public synchronized void close() { - logger.info("SinkHandle is being closed."); + logger.info("[StartCloseSinkHandle]"); sequenceIdToTsBlock.clear(); closed = true; bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryComplete(blocked); @@ -242,7 +239,7 @@ public class SinkHandle implements ISinkHandle { bufferRetainedSizeInBytes = 0; } sinkHandleListener.onFinish(this); - logger.info("SinkHandle is closed"); + logger.info("[EndCloseSinkHandle]"); } @Override @@ -307,7 +304,7 @@ public class SinkHandle implements ISinkHandle { freedBytes += entry.getValue().right; bufferRetainedSizeInBytes -= entry.getValue().right; iterator.remove(); - logger.info("ack TsBlock {}.", entry.getKey()); + logger.info("[ACKTsBlock] {}.", entry.getKey()); } } if (isFinished()) { @@ -381,9 +378,7 @@ public class SinkHandle implements ISinkHandle { public void run() { try (SetThreadName sinkHandleName = new SetThreadName(threadName)) { logger.info( - "Send new data block event [{}, {})", - startSequenceId, - startSequenceId + blockSizes.size()); + "[NotifyNewTsBlock] [{}, {})", startSequenceId, startSequenceId + blockSizes.size()); int attempt = 0; TNewDataBlockEvent newDataBlockEvent = new TNewDataBlockEvent( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java index bd95291eb9..9b8d6c548f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java @@ -130,13 +130,13 @@ public class SourceHandle implements ISourceHandle { return null; } long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId); - logger.info("Receive {} TsBlock, size is {}", currSequenceId, retainedSize); + logger.info("[GetTsBlockFromBuffer] sequenceId:{}, size:{}", currSequenceId, retainedSize); currSequenceId += 1; bufferRetainedSizeInBytes -= retainedSize; localMemoryManager.getQueryPool().free(localFragmentInstanceId.getQueryId(), retainedSize); if (sequenceIdToTsBlock.isEmpty() && !isFinished()) { - logger.info("no buffered TsBlock, blocked"); + logger.info("[WaitForMoreTsBlock]"); blocked = SettableFuture.create(); } if (isFinished()) { @@ -222,7 +222,7 @@ public class SourceHandle implements ISourceHandle { } synchronized void setNoMoreTsBlocks(int lastSequenceId) { - logger.info("receive NoMoreTsBlock event. "); + logger.info("[ReceiveNoMoreTsBlockEvent]"); this.lastSequenceId = lastSequenceId; if (!blocked.isDone() && remoteTsBlockedConsumedUp()) { blocked.set(null); @@ -234,7 +234,7 @@ public class SourceHandle implements ISourceHandle { synchronized void updatePendingDataBlockInfo(int startSequenceId, List<Long> dataBlockSizes) { logger.info( - "receive newDataBlockEvent. [{}, {}), each size is: {}", + "[ReceiveNewTsBlockNotification] [{}, {}), each size is: {}", startSequenceId, startSequenceId + dataBlockSizes.size(), dataBlockSizes); @@ -380,7 +380,7 @@ public class SourceHandle implements ISourceHandle { @Override public void run() { try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { - logger.info("try to get data blocks [{}, {}) ", startSequenceId, endSequenceId); + logger.info("[StartPullTsBlocksFromRemote] [{}, {}) ", startSequenceId, endSequenceId); TGetDataBlockRequest req = new TGetDataBlockRequest(remoteFragmentInstanceId, startSequenceId, endSequenceId); int attempt = 0; @@ -394,7 +394,7 @@ public class SourceHandle implements ISourceHandle { TsBlock tsBlock = serde.deserialize(byteBuffer); tsBlocks.add(tsBlock); } - logger.info("got data blocks. count: {}", tsBlocks.size()); + logger.info("[EndPullTsBlocksFromRemote] Count:{}", tsBlocks.size()); executorService.submit( new SendAcknowledgeDataBlockEventTask(startSequenceId, endSequenceId)); synchronized (SourceHandle.this) { @@ -404,6 +404,7 @@ public class SourceHandle implements ISourceHandle { for (int i = startSequenceId; i < endSequenceId; i++) { sequenceIdToTsBlock.put(i, tsBlocks.get(i - startSequenceId)); } + logger.info("[PutTsBlocksIntoBuffer]"); if (!blocked.isDone()) { blocked.set(null); } @@ -452,7 +453,7 @@ public class SourceHandle implements ISourceHandle { @Override public void run() { try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { - logger.info("send ack data block event [{}, {}).", startSequenceId, endSequenceId); + logger.info("[SendACKTsBlock] [{}, {}).", startSequenceId, endSequenceId); int attempt = 0; TAcknowledgeDataBlockEvent acknowledgeDataBlockEvent = new TAcknowledgeDataBlockEvent( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java index 699777d8f0..eef52841e1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java @@ -78,7 +78,7 @@ public class FragmentInstanceStateMachine { instanceState.addStateChangeListener( newState -> { try (SetThreadName threadName = new SetThreadName(fragmentInstanceId.getFullId())) { - LOGGER.info("State transfer to {}", newState); + LOGGER.info("[StateChanged] To {}", newState); } }); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java index f7ba31d0d9..6f896a6e9e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java @@ -134,7 +134,7 @@ public class LocalExecutionPlanner { freeMemoryForOperators -= estimatedMemorySize; LOGGER.info( String.format( - "consume memory: %d, current remaining memory: %d", + "[ConsumeMemory] consume: %d, current remaining memory: %d", estimatedMemorySize, freeMemoryForOperators)); } } @@ -148,7 +148,7 @@ public class LocalExecutionPlanner { this.freeMemoryForOperators += estimatedMemorySize; LOGGER.info( String.format( - "release memory: %d, current remaining memory: %d", + "[ReleaseMemory] release: %d, current remaining memory: %d", estimatedMemorySize, freeMemoryForOperators)); } }
