This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/mpp_issues_0419 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f4cc559f68ca740668e8e34c62a97608d428e1ad Author: Jinrui.Zhang <[email protected]> AuthorDate: Tue Apr 19 17:56:43 2022 +0800 remove debug log --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 16 +++++---- .../apache/iotdb/db/mpp/buffer/SourceHandle.java | 39 +++++++++------------- .../apache/iotdb/db/mpp/execution/Coordinator.java | 1 - .../iotdb/db/mpp/execution/QueryExecution.java | 14 -------- .../mpp/execution/scheduler/ClusterScheduler.java | 8 ----- .../scheduler/SimpleFragInstanceDispatcher.java | 2 -- .../execution/scheduler/SimpleQueryTerminator.java | 2 -- .../db/mpp/operator/source/SeriesScanOperator.java | 3 -- .../mpp/sql/planner/plan/DistributedQueryPlan.java | 11 ------ .../apache/iotdb/db/utils/QueryDataSetUtils.java | 7 +--- 11 files changed, 28 insertions(+), 77 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index ebfcbee4b1..57d7371a20 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -494,7 +494,7 @@ public class IoTDBConfig { private String rpcImplClassName = TSServiceImpl.class.getName(); /** indicate whether current mode is mpp */ - private boolean mppMode = true; + private boolean mppMode = false; /** Replace implementation class of influxdb protocol service */ private String influxdbImplClassName = InfluxDBServiceImpl.class.getName(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java index c569c8d04e..a72dcb9cb6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java @@ -160,7 +160,11 @@ public class SinkHandle implements ISinkHandle { } private void sendEndOfDataBlockEvent() throws TException { - logger.info("[SinkHandle {}]: send end of data block event.", this.getRemotePlanNodeId()); + logger.debug( + "Send end of data block event to plan node {} of {}. {}", + remotePlanNodeId, + remoteFragmentInstanceId, + Thread.currentThread().getName()); int attempt = 0; TEndOfDataBlockEvent endOfDataBlockEvent = new TEndOfDataBlockEvent( @@ -190,7 +194,7 @@ public class SinkHandle implements ISinkHandle { @Override public void close() throws IOException { - logger.info("[SinkHandle {}]: is being closed.", this.getRemotePlanNodeId()); + logger.info("Sink handle {} is being closed.", this); if (throwable != null) { throw new IOException(throwable); } @@ -207,12 +211,12 @@ public class SinkHandle implements ISinkHandle { } catch (TException e) { throw new IOException(e); } - logger.info("[SinkHandle {}] is closed.", this.getRemotePlanNodeId()); + logger.info("Sink handle {} is closed.", this); } @Override public void abort() { - logger.info("[SinkHandle {}]: is being aborted.", this.getRemotePlanNodeId()); + logger.info("Sink handle {} is being aborted.", this); synchronized (this) { sequenceIdToTsBlock.clear(); closed = true; @@ -224,7 +228,7 @@ public class SinkHandle implements ISinkHandle { } } sinkHandleListener.onAborted(this); - logger.info("[SinkHandle {}]: is aborted.", this.getRemotePlanNodeId()); + logger.info("Sink handle {} is aborted", this); } @Override @@ -335,7 +339,7 @@ public class SinkHandle implements ISinkHandle { @Override public void run() { - logger.info( + logger.debug( "Send new data block event [{}, {}) to plan node {} of {}.", startSequenceId, startSequenceId + blockSizes.size(), diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java index e33265a6a2..37a5bce1b0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java @@ -118,7 +118,6 @@ public class SourceHandle implements ISourceHandle { if (sequenceIdToTsBlock.isEmpty() && !isFinished()) { blocked = SettableFuture.create(); - logger.info("[SourceHandle {}]: blocked is set to new Future.", localPlanNodeId); } } if (isFinished()) { @@ -206,19 +205,9 @@ public class SourceHandle implements ISourceHandle { } synchronized void setNoMoreTsBlocks(int lastSequenceId) { - logger.info( - "[SourceHandle {}]: No more TsBlock. {} ", localPlanNodeId, remoteFragmentInstanceId); this.lastSequenceId = lastSequenceId; - if (!blocked.isDone() && currSequenceId - 1 == lastSequenceId) { - logger.info( - "[SourceHandle {}]: all blocks are consumed. set blocked to null.", localPlanNodeId); + if (!blocked.isDone() && remoteTsBlockedConsumedUp()) { blocked.set(null); - } else { - logger.info( - "[SourceHandle {}]: No need to set blocked. Blocked: {}, Consumed: {} ", - localPlanNodeId, - blocked.isDone(), - currSequenceId - 1 == lastSequenceId); } } @@ -231,13 +220,11 @@ public class SourceHandle implements ISourceHandle { @Override public synchronized void close() { - logger.info("[SourceHandle {}]: closed ", localPlanNodeId); if (closed) { return; } if (blocked != null && !blocked.isDone()) { blocked.cancel(true); - logger.info("[SourceHandle {}]: blocked is cancelled.", localPlanNodeId); } sequenceIdToDataBlockSize.clear(); if (bufferRetainedSizeInBytes > 0) { @@ -252,7 +239,14 @@ public class SourceHandle implements ISourceHandle { @Override public boolean isFinished() { - return throwable == null && currSequenceId - 1 == lastSequenceId; + return throwable == null && remoteTsBlockedConsumedUp(); + } + + // Return true indicates two points: + // 1. Remote SinkHandle has told SourceHandle the total count of TsBlocks by lastSequenceId + // 2. All the TsBlocks has been consumed up + private boolean remoteTsBlockedConsumedUp() { + return currSequenceId - 1 == lastSequenceId; } String getRemoteHostname() { @@ -316,12 +310,13 @@ public class SourceHandle implements ISourceHandle { @Override public void run() { - logger.info( - "[SourceHandle-{}]: Get data blocks [{}, {}) from {}", - localPlanNodeId, + logger.debug( + "Get data blocks [{}, {}) from {} for plan node {} of {}.", startSequenceId, endSequenceId, - remoteFragmentInstanceId); + remoteFragmentInstanceId, + localPlanNodeId, + localFragmentInstanceId); TGetDataBlockRequest req = new TGetDataBlockRequest(remoteFragmentInstanceId, startSequenceId, endSequenceId); int attempt = 0; @@ -343,7 +338,6 @@ public class SourceHandle implements ISourceHandle { } if (!blocked.isDone()) { blocked.set(null); - logger.info("[SourceHandle {}]: blocked is set null.", localPlanNodeId); } } executorService.submit( @@ -382,9 +376,8 @@ public class SourceHandle implements ISourceHandle { @Override public void run() { - logger.info( - "[SourceHandle {}]: Send ack data block event [{}, {}) to {}.", - localPlanNodeId, + logger.debug( + "Send ack data block event [{}, {}) to {}.", startSequenceId, endSequenceId, remoteFragmentInstanceId); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java index c9393c6be7..a7d09b3ccf 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java @@ -98,7 +98,6 @@ public class Coordinator { partitionFetcher, schemaFetcher); queryExecutionMap.put(queryId, execution); - LOG.info("[Query: {}] start QueryExecution. Statement: {}", queryId, sql); execution.start(); return execution.getStatus(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java index 0a1528422a..a9bd605823 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java @@ -123,12 +123,9 @@ public class QueryExecution implements IQueryExecution { public void start() { doLogicalPlan(); doDistributedPlan(); - LOG.info("[{}]: Distribution Plan: {}", this, distributedPlan); if (context.getQueryType() == QueryType.READ) { - // The ResultHandle could only be initialized after distributed planning initResultHandle(); } - LOG.info("[{}]: Start to schedule.", this); schedule(); } @@ -188,7 +185,6 @@ public class QueryExecution implements IQueryExecution { // 1. The client fetch all the result and the ResultHandle is finished. // 2. The client's connection is closed that all owned QueryExecution should be cleaned up if (resultHandle != null && resultHandle.isFinished()) { - LOG.info("[QueryExecution {}]: result handle is closed", context.getQueryId()); resultHandle.close(); } } @@ -206,16 +202,9 @@ public class QueryExecution implements IQueryExecution { if (resultHandle.isClosed() || resultHandle.isFinished()) { return null; } - LOG.info("[QueryExecution {}]: try to get result.", context.getQueryId()); ListenableFuture<Void> blocked = resultHandle.isBlocked(); blocked.get(); - LOG.info( - "[QueryExecution {}]: unblock. Cancelled: {}, Done: {}", - context.getQueryId(), - blocked.isCancelled(), - blocked.isDone()); if (resultHandle.isFinished()) { - LOG.info("[QueryExecution {}]: result is null", context.getQueryId()); releaseResource(); return null; } @@ -261,16 +250,13 @@ public class QueryExecution implements IQueryExecution { SettableFuture<QueryState> future = SettableFuture.create(); stateMachine.addStateChangeListener( state -> { - LOG.info("[QueryExecution {}]: wait status callback invoked", context.getQueryId()); if (state == QueryState.RUNNING || state.isDone()) { future.set(state); } }); try { - LOG.info("[QueryExecution {}]: start to wait status", context.getQueryId()); QueryState state = future.get(); - LOG.info("[QueryExecution {}]: status got", context.getQueryId()); // TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't FINISHED TSStatusCode statusCode = // For WRITE, the state should be FINISHED; For READ, the state could be RUNNING diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java index 1f3ddd87a3..7fa8d3c0b6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java @@ -83,19 +83,13 @@ public class ClusterScheduler implements IScheduler { @Override public void start() { - LOGGER.info( - "[{}] start to dispatch fragment instance. size: {}", - queryContext.getQueryId(), - instances.size()); stateMachine.transitionToDispatching(); Future<FragInstanceDispatchResult> dispatchResultFuture = dispatcher.dispatch(instances); // NOTICE: the FragmentInstance may be dispatched to another Host due to consensus redirect. // So we need to start the state fetcher after the dispatching stage. try { - LOGGER.info("[{}] wait dispatch to be finished", queryContext.getQueryId()); FragInstanceDispatchResult result = dispatchResultFuture.get(); - LOGGER.info("[{}] dispatch finished: {}", queryContext.getQueryId(), result.isSuccessful()); if (!result.isSuccessful()) { stateMachine.transitionToFailed(new IllegalStateException("Fragment cannot be dispatched")); return; @@ -108,9 +102,7 @@ public class ClusterScheduler implements IScheduler { // For the FragmentInstance of WRITE, it will be executed directly when dispatching. if (queryType == QueryType.WRITE) { - LOGGER.info("[{}] prepare to transition WRITE to finished", queryContext.getQueryId()); stateMachine.transitionToFinished(); - LOGGER.info("[{}] transition done", queryContext.getQueryId()); return; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java index 8745dfe95b..09ff1171b9 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java @@ -66,9 +66,7 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher { TSendFragmentInstanceReq req = new TSendFragmentInstanceReq( new TFragmentInstance(buffer), groupId, instance.getType().toString()); - LOGGER.info("start to dispatch fragment instance: {}", instance.getId()); resp = client.sendFragmentInstance(req); - LOGGER.info("dispatch complete: {}", instance.getId()); if (!resp.accepted) { break; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java index 236a118449..5ffe02080a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java @@ -50,7 +50,6 @@ public class SimpleQueryTerminator implements IQueryTerminator { @Override public Future<Boolean> terminate() { - LOGGER.info("[{}] start to submit terminate command", queryId); List<Endpoint> relatedHost = getRelatedHost(fragmentInstances); return executor.submit( @@ -64,7 +63,6 @@ public class SimpleQueryTerminator implements IQueryTerminator { endpoint.getIp(), IoTDBDescriptor.getInstance().getConfig().getInternalPort())); client.cancelQuery(new TCancelQueryReq(queryId.getId())); - LOGGER.info("[{}] cancel query from DataNode[{}]", queryId, endpoint.getIp()); } } catch (TException e) { return false; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java index 2927cf0d77..01b39506fb 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java @@ -107,9 +107,6 @@ public class SeriesScanOperator implements DataSourceOperator { return true; } } - System.out.println( - String.format( - "[SeriesScanOperator-%s]: hasNext returned: %s", sourceId, hasCachedTsBlock)); return hasCachedTsBlock; } catch (IOException e) { throw new RuntimeException("Error happened while scanning the file", e); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java index d9a64788f0..2df53235ad 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java @@ -54,15 +54,4 @@ public class DistributedQueryPlan { public List<FragmentInstance> getInstances() { return instances; } - - public String toString() { - StringBuilder ret = new StringBuilder(); - ret.append("Instance Count: ").append(instances.size()); - ret.append(System.lineSeparator()); - for (FragmentInstance instance : instances) { - ret.append(instance); - ret.append(System.lineSeparator()); - } - return ret.toString(); - } } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java index d255f03ddd..eb07ba32cd 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java @@ -32,9 +32,6 @@ import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.BitMap; import org.apache.iotdb.tsfile.utils.BytesUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -45,7 +42,7 @@ import java.util.List; /** TimeValuePairUtils to convert between thrift format and TsFile format. */ public class QueryDataSetUtils { - private static final Logger LOG = LoggerFactory.getLogger(QueryDataSetUtils.class); + private static final int FLAG = 0x01; private QueryDataSetUtils() {} @@ -196,9 +193,7 @@ public class QueryDataSetUtils { int rowCount = 0; int[] valueOccupation = new int[columnNum]; while (rowCount < fetchSize) { - LOG.info("[ToTSDataSet {}] invoke queryExecution.getBatchResult.", queryExecution); TsBlock tsBlock = queryExecution.getBatchResult(); - LOG.info("[ToTSDataSet {}] result got. Empty: {}", queryExecution, tsBlock == null); if (tsBlock == null) { break; }
