This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch ml_0815_data_lost in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8890b497c7507b9eb8dca77031d6f88285a976c1 Author: Jinrui.Zhang <[email protected]> AuthorDate: Mon Aug 15 16:17:19 2022 +0800 change query info log to debug --- .../db/mpp/execution/exchange/LocalSinkHandle.java | 16 ++++++++-------- .../db/mpp/execution/exchange/LocalSourceHandle.java | 10 +++++----- .../execution/exchange/MPPDataExchangeManager.java | 16 ++++++++-------- .../mpp/execution/exchange/SharedTsBlockQueue.java | 2 +- .../iotdb/db/mpp/execution/exchange/SinkHandle.java | 20 ++++++++++---------- .../db/mpp/execution/exchange/SourceHandle.java | 14 +++++++------- .../fragment/FragmentInstanceStateMachine.java | 2 +- .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 8 ++++---- .../iotdb/db/mpp/plan/execution/QueryExecution.java | 10 +++++----- .../db/mpp/plan/scheduler/ClusterScheduler.java | 2 +- .../plan/scheduler/FixedRateFragInsStateTracker.java | 2 +- .../db/mpp/plan/scheduler/StandaloneScheduler.java | 4 ++-- 12 files changed, 53 insertions(+), 53 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java index aaba4b8d0b..653f61d740 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java @@ -113,7 +113,7 @@ public class LocalSinkHandle implements ISinkHandle { if (queue.hasNoMoreTsBlocks()) { return; } - logger.info("send TsBlocks"); + logger.debug("send TsBlocks"); synchronized (this) { blocked = queue.add(tsBlock); } @@ -129,9 +129,9 @@ public class LocalSinkHandle implements ISinkHandle { public void setNoMoreTsBlocks() { synchronized (queue) { synchronized (this) { - logger.info("set noMoreTsBlocks."); + logger.debug("set noMoreTsBlocks."); if (aborted || closed) { - logger.info("SinkHandle has been aborted={} or closed={}.", aborted, closed); + logger.debug("SinkHandle has been aborted={} or closed={}.", aborted, closed); return; } queue.setNoMoreTsBlocks(true); @@ -139,12 +139,12 @@ public class LocalSinkHandle implements ISinkHandle { } } checkAndInvokeOnFinished(); - logger.info("noMoreTsBlocks has been set."); + logger.debug("noMoreTsBlocks has been set."); } @Override public void abort() { - logger.info("Sink handle is being aborted."); + logger.debug("Sink handle is being aborted."); synchronized (queue) { synchronized (this) { if (aborted || closed) { @@ -155,12 +155,12 @@ public class LocalSinkHandle implements ISinkHandle { sinkHandleListener.onAborted(this); } } - logger.info("Sink handle is aborted"); + logger.debug("Sink handle is aborted"); } @Override public void close() { - logger.info("Sink handle is being closed."); + logger.debug("Sink handle is being closed."); synchronized (queue) { synchronized (this) { if (aborted || closed) { @@ -171,7 +171,7 @@ public class LocalSinkHandle implements ISinkHandle { sinkHandleListener.onFinish(this); } } - logger.info("Sink handle is closed"); + logger.debug("Sink handle is closed"); } public TFragmentInstanceId getRemoteFragmentInstanceId() { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java index 35c7cda823..a6e6b4bf6d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java @@ -94,7 +94,7 @@ public class LocalSourceHandle implements ISourceHandle { } if (tsBlock != null) { currSequenceId++; - logger.info( + logger.debug( "Receive {} TsdBlock, size is {}", currSequenceId, tsBlock.getRetainedSizeInBytes()); } checkAndInvokeOnFinished(); @@ -139,7 +139,7 @@ public class LocalSourceHandle implements ISourceHandle { return; } try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { - logger.info("Source handle is being aborted."); + logger.debug("Source handle is being aborted."); synchronized (queue) { synchronized (this) { if (aborted || closed) { @@ -150,7 +150,7 @@ public class LocalSourceHandle implements ISourceHandle { sourceHandleListener.onAborted(this); } } - logger.info("Source handle is aborted"); + logger.debug("Source handle is aborted"); } } @@ -160,7 +160,7 @@ public class LocalSourceHandle implements ISourceHandle { return; } try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { - logger.info("Source handle is being closed."); + logger.debug("Source handle is being closed."); synchronized (queue) { synchronized (this) { if (aborted || closed) { @@ -171,7 +171,7 @@ public class LocalSourceHandle implements ISourceHandle { sourceHandleListener.onFinished(this); } } - logger.info("Source handle is closed"); + logger.debug("Source handle is closed"); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java index b2b32222d9..ad3bb7c078 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java @@ -215,12 +215,12 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { @Override public void onFinished(ISourceHandle sourceHandle) { - logger.info("finished and release resources"); + logger.debug("finished and release resources"); if (!sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId()) || !sourceHandles .get(sourceHandle.getLocalFragmentInstanceId()) .containsKey(sourceHandle.getLocalPlanNodeId())) { - logger.info("resources has already been released"); + logger.debug("resources has already been released"); } else { sourceHandles .get(sourceHandle.getLocalFragmentInstanceId()) @@ -234,7 +234,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { @Override public void onAborted(ISourceHandle sourceHandle) { - logger.info("onAborted is invoked"); + logger.debug("onAborted is invoked"); onFinished(sourceHandle); } @@ -273,14 +273,14 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { @Override public void onAborted(ISinkHandle sinkHandle) { - logger.info("onAborted is invoked"); + logger.debug("onAborted is invoked"); removeFromMPPDataExchangeManager(sinkHandle); } private void removeFromMPPDataExchangeManager(ISinkHandle sinkHandle) { - logger.info("release resources of finished sink handle"); + logger.debug("release resources of finished sink handle"); if (!sinkHandles.containsKey(sinkHandle.getLocalFragmentInstanceId())) { - logger.info("resources already been released"); + logger.debug("resources already been released"); } sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId()); } @@ -492,7 +492,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { * <p>This method should be called when a fragment instance finished in an abnormal state. */ public void forceDeregisterFragmentInstance(TFragmentInstanceId fragmentInstanceId) { - logger.info("Force deregister fragment instance"); + logger.debug("Force deregister fragment instance"); if (sinkHandles.containsKey(fragmentInstanceId)) { ISinkHandle sinkHandle = sinkHandles.get(fragmentInstanceId); sinkHandle.abort(); @@ -501,7 +501,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { if (sourceHandles.containsKey(fragmentInstanceId)) { Map<String, ISourceHandle> planNodeIdToSourceHandle = sourceHandles.get(fragmentInstanceId); for (Entry<String, ISourceHandle> entry : planNodeIdToSourceHandle.entrySet()) { - logger.info("Close source handle {}", sourceHandles); + logger.debug("Close source handle {}", sourceHandles); entry.getValue().abort(); } sourceHandles.remove(fragmentInstanceId); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java index 6a594b4bf6..ee5b04aa5a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java @@ -92,7 +92,7 @@ public class SharedTsBlockQueue { /** Notify no more tsblocks will be added to the queue. */ public void setNoMoreTsBlocks(boolean noMoreTsBlocks) { - logger.info("SharedTsBlockQueue receive no more TsBlocks signal."); + logger.debug("SharedTsBlockQueue receive no more TsBlocks signal."); if (closed) { throw new IllegalStateException("queue has been destroyed"); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java index 6ccddaf44e..9f0929cefb 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java @@ -170,7 +170,7 @@ public class SinkHandle implements ISinkHandle { } private void sendEndOfDataBlockEvent() throws Exception { - logger.info("send end of data block event"); + logger.debug("send end of data block event"); int attempt = 0; TEndOfDataBlockEvent endOfDataBlockEvent = new TEndOfDataBlockEvent( @@ -196,7 +196,7 @@ public class SinkHandle implements ISinkHandle { @Override public synchronized void setNoMoreTsBlocks() { - logger.info("start to set no-more-tsblocks"); + logger.debug("start to set no-more-tsblocks"); if (aborted || closed) { return; } @@ -205,20 +205,20 @@ public class SinkHandle implements ISinkHandle { } catch (Exception e) { throw new RuntimeException("Send EndOfDataBlockEvent failed", e); } - logger.info("set noMoreTsBlocks to true"); + logger.debug("set noMoreTsBlocks to true"); noMoreTsBlocks = true; if (isFinished()) { - logger.info("revoke onFinish() of sinkHandleListener"); + logger.debug("revoke onFinish() of sinkHandleListener"); sinkHandleListener.onFinish(this); } - logger.info("revoke onEndOfBlocks() of sinkHandleListener"); + logger.debug("revoke onEndOfBlocks() of sinkHandleListener"); sinkHandleListener.onEndOfBlocks(this); } @Override public synchronized void abort() { - logger.info("SinkHandle is being aborted."); + logger.debug("SinkHandle is being aborted."); sequenceIdToTsBlock.clear(); aborted = true; bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blocked); @@ -229,12 +229,12 @@ public class SinkHandle implements ISinkHandle { bufferRetainedSizeInBytes = 0; } sinkHandleListener.onAborted(this); - logger.info("SinkHandle is aborted"); + logger.debug("SinkHandle is aborted"); } @Override public void close() { - logger.info("SinkHandle is being closed."); + logger.debug("SinkHandle is being closed."); sequenceIdToTsBlock.clear(); closed = true; bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryComplete(blocked); @@ -245,7 +245,7 @@ public class SinkHandle implements ISinkHandle { bufferRetainedSizeInBytes = 0; } sinkHandleListener.onFinish(this); - logger.info("SinkHandle is closed"); + logger.debug("SinkHandle is closed"); } @Override @@ -368,7 +368,7 @@ public class SinkHandle implements ISinkHandle { @Override public void run() { try (SetThreadName sinkHandleName = new SetThreadName(threadName)) { - logger.info( + logger.debug( "Send new data block event [{}, {})", startSequenceId, startSequenceId + blockSizes.size()); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java index 1dc68c27a3..8cf41740a8 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java @@ -128,7 +128,7 @@ public class SourceHandle implements ISourceHandle { if (tsBlock == null) { return null; } - logger.info( + logger.debug( "Receive {} TsBlock, size is {}", currSequenceId, tsBlock.getRetainedSizeInBytes()); currSequenceId += 1; bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes(); @@ -137,7 +137,7 @@ public class SourceHandle implements ISourceHandle { .free(localFragmentInstanceId.getQueryId(), tsBlock.getRetainedSizeInBytes()); if (sequenceIdToTsBlock.isEmpty() && !isFinished()) { - logger.info("no buffered TsBlock, blocked"); + logger.debug("no buffered TsBlock, blocked"); blocked = SettableFuture.create(); } if (isFinished()) { @@ -208,7 +208,7 @@ public class SourceHandle implements ISourceHandle { } synchronized void setNoMoreTsBlocks(int lastSequenceId) { - logger.info("receive NoMoreTsBlock event. "); + logger.debug("receive NoMoreTsBlock event. "); this.lastSequenceId = lastSequenceId; if (!blocked.isDone() && remoteTsBlockedConsumedUp()) { blocked.set(null); @@ -219,7 +219,7 @@ public class SourceHandle implements ISourceHandle { } synchronized void updatePendingDataBlockInfo(int startSequenceId, List<Long> dataBlockSizes) { - logger.info( + logger.debug( "receive newDataBlockEvent. [{}, {}), each size is: {}", startSequenceId, startSequenceId + dataBlockSizes.size(), @@ -366,7 +366,7 @@ public class SourceHandle implements ISourceHandle { @Override public void run() { try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { - logger.info("try to get data blocks [{}, {}) ", startSequenceId, endSequenceId); + logger.debug("try to get data blocks [{}, {}) ", startSequenceId, endSequenceId); TGetDataBlockRequest req = new TGetDataBlockRequest(remoteFragmentInstanceId, startSequenceId, endSequenceId); int attempt = 0; @@ -380,7 +380,7 @@ public class SourceHandle implements ISourceHandle { TsBlock tsBlock = serde.deserialize(byteBuffer); tsBlocks.add(tsBlock); } - logger.info("got data blocks. count: {}", tsBlocks.size()); + logger.debug("got data blocks. count: {}", tsBlocks.size()); executorService.submit( new SendAcknowledgeDataBlockEventTask(startSequenceId, endSequenceId)); synchronized (SourceHandle.this) { @@ -433,7 +433,7 @@ public class SourceHandle implements ISourceHandle { @Override public void run() { try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { - logger.info("send ack data block event [{}, {}).", startSequenceId, endSequenceId); + logger.debug("send ack data block event [{}, {}).", startSequenceId, endSequenceId); int attempt = 0; TAcknowledgeDataBlockEvent acknowledgeDataBlockEvent = new TAcknowledgeDataBlockEvent( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java index 699777d8f0..d903f6eb13 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java @@ -78,7 +78,7 @@ public class FragmentInstanceStateMachine { instanceState.addStateChangeListener( newState -> { try (SetThreadName threadName = new SetThreadName(fragmentInstanceId.getFullId())) { - LOGGER.info("State transfer to {}", newState); + LOGGER.debug("State transfer to {}", newState); } }); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java index 4927629622..79bf3965af 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java @@ -172,9 +172,9 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> analysis.setStatement(queryStatement); // request schema fetch API - logger.info("{} fetch query schema...", getLogHeader()); + logger.debug("{} fetch query schema...", getLogHeader()); ISchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree); - logger.info("{} fetch schema done", getLogHeader()); + logger.debug("{} fetch schema done", getLogHeader()); // If there is no leaf node in the schema tree, the query should be completed immediately if (schemaTree.isEmpty()) { analysis.setFinishQueryAfterAnalyze(true); @@ -1380,9 +1380,9 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> if (showTimeSeriesStatement.isOrderByHeat()) { patternTree.constructTree(); // request schema fetch API - logger.info("{} fetch query schema...", getLogHeader()); + logger.debug("{} fetch query schema...", getLogHeader()); ISchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree); - logger.info("{} fetch schema done", getLogHeader()); + logger.debug("{} fetch schema done", getLogHeader()); List<MeasurementPath> allSelectedPath = schemaTree.getAllMeasurement(); Set<Expression> sourceExpressions = diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java index 4a6ac5fdc0..925f6edb40 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java @@ -153,7 +153,7 @@ public class QueryExecution implements IQueryExecution { if (state == QueryState.FAILED || state == QueryState.ABORTED || state == QueryState.CANCELED) { - logger.info("release resource because Query State is: {}", state); + logger.debug("release resource because Query State is: {}", state); releaseResource(); } } @@ -260,7 +260,7 @@ public class QueryExecution implements IQueryExecution { LogicalPlanner planner = new LogicalPlanner(this.context, this.planOptimizers); this.logicalPlan = planner.plan(this.analysis); if (isQuery()) { - logger.info( + logger.debug( "logical plan is: \n {}", PlanNodeUtil.nodeToString(this.logicalPlan.getRootNode())); } } @@ -270,7 +270,7 @@ public class QueryExecution implements IQueryExecution { DistributionPlanner planner = new DistributionPlanner(this.analysis, this.logicalPlan); this.distributedPlan = planner.planFragments(); if (isQuery()) { - logger.info( + logger.debug( "distribution plan done. Fragment instance count is {}, details is: \n {}", distributedPlan.getInstances().size(), printFragmentInstances(distributedPlan.getInstances())); @@ -326,7 +326,7 @@ public class QueryExecution implements IQueryExecution { while (true) { try { if (resultHandle.isAborted()) { - logger.info("resultHandle for client is aborted"); + logger.debug("resultHandle for client is aborted"); stateMachine.transitionToAborted(); if (stateMachine.getFailureStatus() != null) { throw new IoTDBException( @@ -339,7 +339,7 @@ public class QueryExecution implements IQueryExecution { // Once the resultHandle is finished, we should transit the state of this query to // FINISHED. // So that the corresponding cleanup work could be triggered. - logger.info("resultHandle for client is finished"); + logger.debug("resultHandle for client is finished"); stateMachine.transitionToFinished(); return Optional.empty(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java index 417b6b5e18..9d87d73f24 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java @@ -137,7 +137,7 @@ public class ClusterScheduler implements IScheduler { // TODO: (xingtanzjr) start the stateFetcher/heartbeat for each fragment instance this.stateTracker.start(); - logger.info("state tracker starts"); + logger.debug("state tracker starts"); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java index 42a98f6b38..f4da4a555d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java @@ -94,7 +94,7 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker { try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) { FragmentInstanceState state = fetchState(instance); if (needPrintState(lastState, state, durationToLastPrintInMS)) { - logger.info("State is {}", state); + logger.debug("State is {}", state); lastState = state; durationToLastPrintInMS = 0; } else { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java index 09c7da7b43..89a69f1022 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java @@ -114,13 +114,13 @@ public class StandaloneScheduler implements IScheduler { } // The FragmentInstances has been dispatched successfully to corresponding host, we mark the stateMachine.transitionToRunning(); - LOGGER.info("{} transit to RUNNING", getLogHeader()); + LOGGER.debug("{} transit to RUNNING", getLogHeader()); instances.forEach( instance -> stateMachine.initialFragInstanceState( instance.getId(), FragmentInstanceState.RUNNING)); this.stateTracker.start(); - LOGGER.info("{} state tracker starts", getLogHeader()); + LOGGER.debug("{} state tracker starts", getLogHeader()); break; case WRITE: try {
