This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch QueryException in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 50ed598efc04e8b6aa05d82cac36fe57be6db919 Author: JackieTien97 <[email protected]> AuthorDate: Tue Jul 12 13:53:27 2022 +0800 make query error more concrete --- .../db/mpp/execution/exchange/ISourceHandle.java | 12 +- .../mpp/execution/exchange/LocalSourceHandle.java | 45 +++++- .../execution/exchange/MPPDataExchangeManager.java | 12 +- .../mpp/execution/exchange/SharedTsBlockQueue.java | 25 +++- .../db/mpp/execution/exchange/SourceHandle.java | 48 +++++-- .../fragment/FragmentInstanceContext.java | 7 + .../fragment/FragmentInstanceExecution.java | 12 +- .../fragment/FragmentInstanceManager.java | 3 +- .../operator/source/ExchangeOperator.java | 2 +- .../db/mpp/plan/analyze/ClusterSchemaFetcher.java | 8 +- .../db/mpp/plan/execution/IQueryExecution.java | 3 +- .../db/mpp/plan/execution/QueryExecution.java | 32 ++++- .../plan/execution/memory/MemorySourceHandle.java | 3 + .../db/mpp/plan/scheduler/ClusterScheduler.java | 1 + .../scheduler/FragmentInstanceDispatcherImpl.java | 158 +++++++++++---------- .../mpprest/handler/QueryDataSetHandler.java | 21 +-- .../service/thrift/impl/ClientRPCServiceImpl.java | 2 +- .../impl/DataNodeInternalRPCServiceImpl.java | 80 ++++++++--- .../apache/iotdb/db/utils/QueryDataSetUtils.java | 3 +- thrift/src/main/thrift/datanode.thrift | 1 + 20 files changed, 339 insertions(+), 139 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java index be62bff4ff..f2538c8db7 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISourceHandle.java @@ -50,8 +50,18 @@ public interface ISourceHandle { boolean isAborted(); /** - * Abort the handle. Discard all tsblocks which may still be in the memory buffer and complete the + * Abort the handle. Discard all tsblocks which may still be in the memory buffer and cancel the * future returned by {@link #isBlocked()}. + * + * <p>Should only be called in abnormal case */ void abort(); + + /** + * Close the handle. Discard all tsblocks which may still be in the memory buffer and complete the + * future returned by {@link #isBlocked()}. + * + * <p>Should only be called in normal case + */ + void close(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java index c101e7b644..95056cf44b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java @@ -43,6 +43,8 @@ public class LocalSourceHandle implements ISourceHandle { private final SharedTsBlockQueue queue; private boolean aborted = false; + private boolean closed = false; + private int currSequenceId; private final String threadName; @@ -81,9 +83,8 @@ public class LocalSourceHandle implements ISourceHandle { @Override public TsBlock receive() { try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { - if (aborted) { - throw new IllegalStateException("Source handle is aborted."); - } + checkState(); + if (!queue.isBlocked().isDone()) { throw new IllegalStateException("Source handle is blocked."); } @@ -123,9 +124,7 @@ public class LocalSourceHandle implements ISourceHandle { @Override public ListenableFuture<?> isBlocked() { - if (aborted) { - throw new IllegalStateException("Source handle is closed."); - } + checkState(); return nonCancellationPropagating(queue.isBlocked()); } @@ -136,6 +135,9 @@ public class LocalSourceHandle implements ISourceHandle { @Override public void abort() { + if (aborted || closed) { + return; + } try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { logger.info("Source handle is being aborted."); synchronized (queue) { @@ -143,7 +145,7 @@ public class LocalSourceHandle implements ISourceHandle { if (aborted) { return; } - queue.destroy(); + queue.abort(); aborted = true; sourceHandleListener.onAborted(this); } @@ -152,6 +154,35 @@ public class LocalSourceHandle implements ISourceHandle { } } + @Override + public void close() { + if (aborted || closed) { + return; + } + try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { + logger.info("Source handle is being closed."); + synchronized (queue) { + synchronized (this) { + if (aborted) { + return; + } + queue.destroy(); + closed = true; + sourceHandleListener.onFinished(this); + } + } + logger.info("Source handle is closed"); + } + } + + private void checkState() { + if (aborted) { + throw new IllegalStateException("Source handle is aborted."); + } else if (closed) { + throw new IllegalStateException("Source Handle is closed."); + } + } + public TFragmentInstanceId getRemoteFragmentInstanceId() { return remoteFragmentInstanceId; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java index 8aa2a6fa3a..7f087e2a9a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java @@ -141,7 +141,11 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { || sourceHandles .get(e.getTargetFragmentInstanceId()) .get(e.getTargetPlanNodeId()) - .isAborted()) { + .isAborted() + || sourceHandles + .get(e.getTargetFragmentInstanceId()) + .get(e.getTargetPlanNodeId()) + .isFinished()) { // In some scenario, when the SourceHandle sends the data block ACK event, its upstream // may // have already been stopped. For example, in the query whit LimitOperator, the downstream @@ -176,7 +180,11 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { || sourceHandles .get(e.getTargetFragmentInstanceId()) .get(e.getTargetPlanNodeId()) - .isAborted()) { + .isAborted() + || sourceHandles + .get(e.getTargetFragmentInstanceId()) + .get(e.getTargetPlanNodeId()) + .isFinished()) { logger.warn( "received onEndOfDataBlockEvent but the downstream FragmentInstance[{}] is not found", e.getTargetFragmentInstanceId()); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java index 7aa96230ba..7cac64a6e3 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java @@ -152,7 +152,7 @@ public class SharedTsBlockQueue { return blockedOnMemory; } - /** Destroy the queue and cancel the future. */ + /** Destroy the queue and complete the future. Should only be called in normal case */ public void destroy() { if (destroyed) { return; @@ -172,4 +172,27 @@ public class SharedTsBlockQueue { bufferRetainedSizeInBytes = 0; } } + + // TODO add Throwable t as a parameter of this method, and then call blocked.setException(t); + // instead of blocked.cancel(true); + /** Destroy the queue and cancel the future. Should only be called in normal case */ + public void abort() { + if (destroyed) { + return; + } + destroyed = true; + if (!blocked.isDone()) { + blocked.cancel(true); + } + if (blockedOnMemory != null) { + bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blockedOnMemory); + } + queue.clear(); + if (bufferRetainedSizeInBytes > 0L) { + localMemoryManager + .getQueryPool() + .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes); + bufferRetainedSizeInBytes = 0; + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java index facd0c1e0f..15ac9e0529 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java @@ -86,6 +86,8 @@ public class SourceHandle implements ISourceHandle { private int lastSequenceId = Integer.MAX_VALUE; private boolean aborted = false; + private boolean closed = false; + public SourceHandle( TEndPoint remoteEndpoint, TFragmentInstanceId remoteFragmentInstanceId, @@ -116,9 +118,8 @@ public class SourceHandle implements ISourceHandle { public synchronized TsBlock receive() { try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { - if (aborted) { - throw new IllegalStateException("Source handle is aborted."); - } + checkState(); + if (!blocked.isDone()) { throw new IllegalStateException("Source handle is blocked."); } @@ -149,7 +150,7 @@ public class SourceHandle implements ISourceHandle { private synchronized void trySubmitGetDataBlocksTask() { try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { - if (aborted) { + if (aborted || closed) { return; } if (blockedOnMemory != null && !blockedOnMemory.isDone()) { @@ -202,9 +203,7 @@ public class SourceHandle implements ISourceHandle { @Override public synchronized ListenableFuture<?> isBlocked() { - if (aborted) { - throw new IllegalStateException("Source handle is aborted."); - } + checkState(); return nonCancellationPropagating(blocked); } @@ -234,7 +233,7 @@ public class SourceHandle implements ISourceHandle { @Override public synchronized void abort() { try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { - if (aborted) { + if (aborted || closed) { return; } if (blocked != null && !blocked.isDone()) { @@ -255,6 +254,31 @@ public class SourceHandle implements ISourceHandle { } } + @Override + public synchronized void close() { + try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { + if (aborted || closed) { + return; + } + if (blocked != null && !blocked.isDone()) { + blocked.set(null); + } + if (blockedOnMemory != null) { + bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blockedOnMemory); + } + sequenceIdToDataBlockSize.clear(); + if (bufferRetainedSizeInBytes > 0) { + localMemoryManager + .getQueryPool() + .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes); + bufferRetainedSizeInBytes = 0; + } + closed = true; + currSequenceId = lastSequenceId + 1; + sourceHandleListener.onFinished(this); + } + } + @Override public boolean isFinished() { return remoteTsBlockedConsumedUp(); @@ -293,6 +317,14 @@ public class SourceHandle implements ISourceHandle { return aborted; } + private void checkState() { + if (aborted) { + throw new IllegalStateException("Source handle is aborted."); + } else if (closed) { + throw new IllegalStateException("SourceHandle is closed."); + } + } + @Override public String toString() { return String.format( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java index ffc9d101b3..0d5cdf54d1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; @@ -150,6 +151,12 @@ public class FragmentInstanceContext extends QueryContext { stateMachine.failed(cause); } + public String getFailedCause() { + return stateMachine.getFailureCauses().stream() + .map(Throwable::getMessage) + .collect(Collectors.joining("; ")); + } + public void finished() { stateMachine.finished(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java index ab51fa842d..ac70a848dd 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java @@ -84,12 +84,8 @@ public class FragmentInstanceExecution { } public FragmentInstanceInfo getInstanceInfo() { - return new FragmentInstanceInfo(stateMachine.getState(), context.getEndTime()); - } - - public void failed(Throwable cause) { - requireNonNull(cause, "cause is null"); - stateMachine.failed(cause); + return new FragmentInstanceInfo( + stateMachine.getState(), context.getEndTime(), context.getFailedCause()); } public void cancel() { @@ -121,7 +117,9 @@ public class FragmentInstanceExecution { sinkHandle.abort(); // help for gc sinkHandle = null; - scheduler.abortFragmentInstance(instanceId); + if (newState.isFailed()) { + scheduler.abortFragmentInstance(instanceId); + } } }); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java index 504d3e4676..51b689eda2 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java @@ -194,8 +194,9 @@ public class FragmentInstanceManager { } private FragmentInstanceInfo createFailedInstanceInfo(FragmentInstanceId instanceId) { + FragmentInstanceContext context = instanceContext.get(instanceId); return new FragmentInstanceInfo( - FragmentInstanceState.FAILED, instanceContext.get(instanceId).getEndTime()); + FragmentInstanceState.FAILED, context.getEndTime(), context.getFailedCause()); } private void removeOldInstances() { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java index 3579a642bf..c72063caeb 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java @@ -81,6 +81,6 @@ public class ExchangeOperator implements SourceOperator { @Override public void close() throws Exception { - sourceHandle.abort(); + sourceHandle.close(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java index 19356f6471..2532cc4cc0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.analyze; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.partition.SchemaPartition; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBConfig; @@ -119,7 +120,12 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { while (coordinator.getQueryExecution(queryId).hasNextResult()) { // The query will be transited to FINISHED when invoking getBatchResult() at the last time // So we don't need to clean up it manually - Optional<TsBlock> tsBlock = coordinator.getQueryExecution(queryId).getBatchResult(); + Optional<TsBlock> tsBlock; + try { + tsBlock = coordinator.getQueryExecution(queryId).getBatchResult(); + } catch (IoTDBException e) { + throw new RuntimeException("Fetch Schema failed. ", e); + } if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) { break; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java index 70fffda9b6..ba0380d015 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.mpp.plan.execution; +import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.db.mpp.common.header.DatasetHeader; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@ -34,7 +35,7 @@ public interface IQueryExecution { ExecutionResult getStatus(); - Optional<TsBlock> getBatchResult(); + Optional<TsBlock> getBatchResult() throws IoTDBException; boolean hasNextResult(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java index 4bae3eeafc..fe6d48afa2 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; +import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -64,7 +65,6 @@ import io.airlift.concurrent.SetThreadName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -74,6 +74,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Throwables.throwIfUnchecked; import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode; @@ -280,11 +281,22 @@ public class QueryExecution implements IQueryExecution { * implemented with DataStreamManager) */ @Override - public Optional<TsBlock> getBatchResult() { + public Optional<TsBlock> getBatchResult() throws IoTDBException { + checkArgument(resultHandle != null, "ResultHandle in Coordinator should be init firstly."); // iterate until we get a non-nullable TsBlock or result is finished while (true) { try { - if (resultHandle == null || resultHandle.isAborted() || resultHandle.isFinished()) { + if (resultHandle.isAborted()) { + logger.info("resultHandle for client is aborted"); + stateMachine.transitionToAborted(); + if (stateMachine.getFailureStatus() != null) { + throw new IoTDBException( + stateMachine.getFailureStatus().getMessage(), stateMachine.getFailureStatus().code); + } else { + throw new IoTDBException( + stateMachine.getFailureMessage(), TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode()); + } + } else if (resultHandle.isFinished()) { // Once the resultHandle is finished, we should transit the state of this query to // FINISHED. // So that the corresponding cleanup work could be triggered. @@ -292,6 +304,7 @@ public class QueryExecution implements IQueryExecution { stateMachine.transitionToFinished(); return Optional.empty(); } + ListenableFuture<?> blocked = resultHandle.isBlocked(); blocked.get(); if (!resultHandle.isFinished()) { @@ -305,13 +318,17 @@ public class QueryExecution implements IQueryExecution { } } catch (ExecutionException | CancellationException e) { stateMachine.transitionToFailed(e); + if (stateMachine.getFailureStatus() != null) { + throw new IoTDBException( + stateMachine.getFailureStatus().getMessage(), stateMachine.getFailureStatus().code); + } Throwable t = e.getCause() == null ? e : e.getCause(); throwIfUnchecked(t); - throw new RuntimeException(t); + throw new IoTDBException(t, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode()); } catch (InterruptedException e) { stateMachine.transitionToFailed(e); Thread.currentThread().interrupt(); - throw new RuntimeException(new SQLException("ResultSet thread was interrupted", e)); + throw new IoTDBException(e, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode()); } } } @@ -363,7 +380,10 @@ public class QueryExecution implements IQueryExecution { } return new ExecutionResult( context.getQueryId(), - RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, stateMachine.getFailureMessage())); + stateMachine.getFailureStatus() == null + ? RpcUtils.getStatus( + TSStatusCode.INTERNAL_SERVER_ERROR, stateMachine.getFailureMessage()) + : stateMachine.getFailureStatus()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java index 588407eae1..2cf224962b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java @@ -77,4 +77,7 @@ public class MemorySourceHandle implements ISourceHandle { @Override public void abort() {} + + @Override + public void close() {} } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java index 0a6d51eb4a..73730864d2 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java @@ -103,6 +103,7 @@ public class ClusterScheduler implements IScheduler { if (result.getFailureStatus() != null) { stateMachine.transitionToFailed(result.getFailureStatus()); } else { + // won't get into here stateMachine.transitionToFailed( new IllegalStateException("Fragment cannot be dispatched")); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java index d3f0d21e86..21a4436a6b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.consensus.ConsensusGroupId; @@ -46,16 +47,13 @@ import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; -import com.google.common.util.concurrent.SettableFuture; import io.airlift.concurrent.SetThreadName; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.LinkedList; import java.util.List; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -102,67 +100,45 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { return executor.submit( () -> { for (FragmentInstance instance : instances) { - boolean accepted = dispatchOneInstance(instance); - if (!accepted) { - return new FragInstanceDispatchResult(false); + try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) { + dispatchOneInstance(instance); + } catch (FragmentInstanceDispatchException e) { + return new FragInstanceDispatchResult(e.getFailureStatus()); + } catch (Throwable t) { + logger.error("cannot dispatch FI for read operation", t); + return new FragInstanceDispatchResult( + RpcUtils.getStatus( + TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())); } } return new FragInstanceDispatchResult(true); }); } - // TODO: (xingtanzjr) Return the detailed write states for each FragmentInstance - private Future<FragInstanceDispatchResult> dispatchWrite(List<FragmentInstance> instances) { - List<Future<Boolean>> futures = new LinkedList<>(); - for (FragmentInstance instance : instances) { - futures.add(writeOperationExecutor.submit(() -> dispatchOneInstance(instance))); - } - SettableFuture<FragInstanceDispatchResult> resultFuture = SettableFuture.create(); - for (Future<Boolean> future : futures) { - try { - Boolean success = future.get(); - if (!success) { - resultFuture.set(new FragInstanceDispatchResult(false)); - break; - } - } catch (ExecutionException | InterruptedException e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - resultFuture.setException(e); - break; - } - } - resultFuture.set(new FragInstanceDispatchResult(true)); - return resultFuture; - } - private Future<FragInstanceDispatchResult> dispatchWriteSync(List<FragmentInstance> instances) { - boolean result = true; - try { - for (FragmentInstance instance : instances) { - - if (!dispatchOneInstance(instance)) { - result = false; - break; - } + for (FragmentInstance instance : instances) { + try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) { + dispatchOneInstance(instance); + } catch (FragmentInstanceDispatchException e) { + return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus())); + } catch (Throwable t) { + logger.error("cannot dispatch FI for write operation", t); + return immediateFuture( + new FragInstanceDispatchResult( + RpcUtils.getStatus( + TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage()))); } - return immediateFuture(new FragInstanceDispatchResult(result)); - } catch (FragmentInstanceDispatchException e) { - logger.error("cannot dispatch FI for write operation", e); - return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus())); } + return immediateFuture(new FragInstanceDispatchResult(true)); } - private boolean dispatchOneInstance(FragmentInstance instance) + private void dispatchOneInstance(FragmentInstance instance) throws FragmentInstanceDispatchException { - try (SetThreadName fragmentInstanceName = new SetThreadName(instance.getId().getFullId())) { - TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint(); - if (isDispatchedToLocal(endPoint)) { - return dispatchLocally(instance); - } else { - return dispatchRemote(instance, endPoint); - } + TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint(); + if (isDispatchedToLocal(endPoint)) { + dispatchLocally(instance); + } else { + dispatchRemote(instance, endPoint); } } @@ -170,7 +146,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { return this.localhostIpAddr.equals(endPoint.getIp()) && localhostInternalPort == endPoint.port; } - private boolean dispatchRemote(FragmentInstance instance, TEndPoint endPoint) + private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint) throws FragmentInstanceDispatchException { try (SyncDataNodeInternalServiceClient client = internalServiceClientManager.borrowClient(endPoint)) { @@ -182,7 +158,11 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { instance.getRegionReplicaSet().getRegionId()); TSendFragmentInstanceResp sendFragmentInstanceResp = client.sendFragmentInstance(sendFragmentInstanceReq); - return sendFragmentInstanceResp.accepted; + if (!sendFragmentInstanceResp.accepted) { + throw new FragmentInstanceDispatchException( + RpcUtils.getStatus( + TSStatusCode.EXECUTE_STATEMENT_ERROR, sendFragmentInstanceResp.message)); + } case WRITE: TSendPlanNodeReq sendPlanNodeReq = new TSendPlanNodeReq( @@ -193,36 +173,67 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { logger.error(sendPlanNodeResp.getStatus().message); throw new FragmentInstanceDispatchException(sendPlanNodeResp.getStatus()); } - return true; } } catch (IOException | TException e) { logger.error("can't connect to node {}", endPoint, e); - throw new FragmentInstanceDispatchException(e); + TSStatus status = new TSStatus(); + status.setCode(TSStatusCode.SYNC_CONNECTION_EXCEPTION.getStatusCode()); + status.setMessage("can't connect to node {}" + endPoint); + throw new FragmentInstanceDispatchException(status); } - return false; } - private boolean dispatchLocally(FragmentInstance instance) - throws FragmentInstanceDispatchException { - ConsensusGroupId groupId = - ConsensusGroupId.Factory.createFromTConsensusGroupId( - instance.getRegionReplicaSet().getRegionId()); + private void dispatchLocally(FragmentInstance instance) throws FragmentInstanceDispatchException { + // deserialize ConsensusGroupId + ConsensusGroupId groupId; + try { + groupId = + ConsensusGroupId.Factory.createFromTConsensusGroupId( + instance.getRegionReplicaSet().getRegionId()); + } catch (Throwable t) { + logger.error("Deserialize ConsensusGroupId failed. ", t); + throw new FragmentInstanceDispatchException( + RpcUtils.getStatus( + TSStatusCode.EXECUTE_STATEMENT_ERROR, + "Deserialize ConsensusGroupId failed: " + t.getMessage())); + } + switch (instance.getType()) { case READ: + // execute fragment instance in state machine ConsensusReadResponse readResponse; - if (groupId instanceof DataRegionId) { - readResponse = DataRegionConsensusImpl.getInstance().read(groupId, instance); - } else { - readResponse = SchemaRegionConsensusImpl.getInstance().read(groupId, instance); + try { + if (groupId instanceof DataRegionId) { + readResponse = DataRegionConsensusImpl.getInstance().read(groupId, instance); + } else { + readResponse = SchemaRegionConsensusImpl.getInstance().read(groupId, instance); + } + } catch (Throwable t) { + logger.error("Execute FragmentInstance in ConsensusGroup {} failed.", groupId, t); + throw new FragmentInstanceDispatchException( + RpcUtils.getStatus( + TSStatusCode.EXECUTE_STATEMENT_ERROR, + "Execute FragmentInstance failed. " + t.getMessage())); } if (!readResponse.isSuccess()) { logger.error( - "dispatch FragmentInstance {} locally failed because {}", + "dispatch FragmentInstance {} locally failed. ", instance, readResponse.getException()); - return false; + throw new FragmentInstanceDispatchException( + RpcUtils.getStatus( + TSStatusCode.EXECUTE_STATEMENT_ERROR, + "Execute FragmentInstance failed: " + + (readResponse.getException() == null + ? "" + : readResponse.getException().getMessage()))); + } else { + FragmentInstanceInfo info = (FragmentInstanceInfo) readResponse.getDataset(); + if (!info.getState().isFailed()) { + throw new FragmentInstanceDispatchException( + RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, info.getMessage())); + } } - return !((FragmentInstanceInfo) readResponse.getDataset()).getState().isFailed(); case WRITE: PlanNode planNode = instance.getFragment().getRoot(); boolean hasFailedMeasurement = false; @@ -258,11 +269,12 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { RpcUtils.getStatus( TSStatusCode.METADATA_ERROR.getStatusCode(), partialInsertMessage)); } - - return true; } - throw new UnsupportedOperationException( - String.format("unknown query type [%s]", instance.getType())); + + throw new FragmentInstanceDispatchException( + RpcUtils.getStatus( + TSStatusCode.INTERNAL_SERVER_ERROR, + String.format("unknown query type [%s]", instance.getType()))); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/QueryDataSetHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/QueryDataSetHandler.java index eaa55d1d3f..2fc7ad4976 100644 --- a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/QueryDataSetHandler.java +++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/handler/QueryDataSetHandler.java @@ -17,6 +17,7 @@ package org.apache.iotdb.db.protocol.mpprest.handler; +import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.db.mpp.common.header.DatasetHeader; import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution; import org.apache.iotdb.db.mpp.plan.statement.Statement; @@ -47,7 +48,7 @@ public class QueryDataSetHandler { */ public static Response fillQueryDataSet( IQueryExecution queryExecution, Statement statement, int actualRowSizeLimit) - throws IOException { + throws IOException, IoTDBException { if (statement instanceof ShowStatement) { return fillShowPlanDataSet(queryExecution, actualRowSizeLimit); } else if (statement instanceof QueryStatement) { @@ -67,7 +68,8 @@ public class QueryDataSetHandler { } public static Response fillDataSetWithTimestamps( - IQueryExecution queryExecution, final int actualRowSizeLimit, final long timePrecision) { + IQueryExecution queryExecution, final int actualRowSizeLimit, final long timePrecision) + throws IoTDBException { org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet = new org.apache.iotdb.db.protocol.rest.model.QueryDataSet(); DatasetHeader header = queryExecution.getDatasetHeader(); @@ -81,7 +83,7 @@ public class QueryDataSetHandler { } public static Response fillAggregationPlanDataSet( - IQueryExecution queryExecution, final int actualRowSizeLimit) { + IQueryExecution queryExecution, final int actualRowSizeLimit) throws IoTDBException { org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet = new org.apache.iotdb.db.protocol.rest.model.QueryDataSet(); @@ -97,7 +99,7 @@ public class QueryDataSetHandler { } private static Response fillShowPlanDataSet( - IQueryExecution queryExecution, final int actualRowSizeLimit) throws IOException { + IQueryExecution queryExecution, final int actualRowSizeLimit) throws IoTDBException { org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet = new org.apache.iotdb.db.protocol.rest.model.QueryDataSet(); initTargetDatasetOrderByOrderWithSourceDataSet( @@ -135,7 +137,8 @@ public class QueryDataSetHandler { IQueryExecution queryExecution, int actualRowSizeLimit, org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet, - final long timePrecision) { + final long timePrecision) + throws IoTDBException { int fetched = 0; int columnNum = queryExecution.getOutputValueColumnCount(); while (fetched < actualRowSizeLimit) { @@ -177,7 +180,8 @@ public class QueryDataSetHandler { private static Response fillQueryDataSetWithoutTimestamps( IQueryExecution queryExecution, int actualRowSizeLimit, - org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet) { + org.apache.iotdb.db.protocol.rest.model.QueryDataSet targetDataSet) + throws IoTDBException { int fetched = 0; int columnNum = queryExecution.getOutputValueColumnCount(); while (fetched < actualRowSizeLimit) { @@ -210,7 +214,7 @@ public class QueryDataSetHandler { } public static Response fillGrafanaVariablesResult( - IQueryExecution queryExecution, Statement statement) { + IQueryExecution queryExecution, Statement statement) throws IoTDBException { List<String> results = new ArrayList<>(); Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult(); if (!optionalTsBlock.isPresent()) { @@ -232,7 +236,8 @@ public class QueryDataSetHandler { return Response.ok().entity(results).build(); } - public static Response fillGrafanaNodesResult(IQueryExecution queryExecution) throws IOException { + public static Response fillGrafanaNodesResult(IQueryExecution queryExecution) + throws IoTDBException { List<String> nodes = new ArrayList<>(); Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult(); if (!optionalTsBlock.isPresent()) { diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java index 844ffa66ad..550ef32f60 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java @@ -545,7 +545,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (s == null) { return RpcUtils.getTSExecuteStatementResp( RpcUtils.getStatus( - TSStatusCode.EXECUTE_STATEMENT_ERROR, "This operation type is not supported")); + TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported")); } // permission check TSStatus status = AuthorityChecker.checkAuthority(s, req.sessionId); diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java index 16b04f4e7b..048d791e39 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -92,6 +92,7 @@ import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.exception.NotImplementedException; +import com.google.common.collect.ImmutableList; import io.airlift.concurrent.SetThreadName; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -118,26 +119,63 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface @Override public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req) { LOGGER.info("receive FragmentInstance to group[{}]", req.getConsensusGroupId()); - ConsensusGroupId groupId = - ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); - ConsensusReadResponse readResponse; + + // deserialize ConsensusGroupId + ConsensusGroupId groupId; + try { + groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); + } catch (Throwable t) { + LOGGER.error("Deserialize ConsensusGroupId failed. ", t); + TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false); + resp.setMessage("Deserialize ConsensusGroupId failed: " + t.getMessage()); + return resp; + } + // We deserialize here instead of the underlying state machine because parallelism is possible // here but not at the underlying state machine - FragmentInstance fragmentInstance = FragmentInstance.deserializeFrom(req.fragmentInstance.body); - if (groupId instanceof DataRegionId) { - readResponse = DataRegionConsensusImpl.getInstance().read(groupId, fragmentInstance); - } else { - readResponse = SchemaRegionConsensusImpl.getInstance().read(groupId, fragmentInstance); + FragmentInstance fragmentInstance; + try { + fragmentInstance = FragmentInstance.deserializeFrom(req.fragmentInstance.body); + } catch (Throwable t) { + LOGGER.error("Deserialize FragmentInstance failed.", t); + TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false); + resp.setMessage("Deserialize FragmentInstance failed: " + t.getMessage()); + return resp; } - if (!readResponse.isSuccess()) { + + // execute fragment instance in state machine + ConsensusReadResponse readResponse; + try (SetThreadName threadName = new SetThreadName(fragmentInstance.getId().getFullId())) { + if (groupId instanceof DataRegionId) { + readResponse = DataRegionConsensusImpl.getInstance().read(groupId, fragmentInstance); + } else { + readResponse = SchemaRegionConsensusImpl.getInstance().read(groupId, fragmentInstance); + } + TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(); + if (!readResponse.isSuccess()) { + LOGGER.error( + "Execute FragmentInstance in ConsensusGroup {} failed.", + req.getConsensusGroupId(), + readResponse.getException()); + resp.setAccepted(false); + resp.setMessage( + "Execute FragmentInstance failed: " + + (readResponse.getException() == null + ? "" + : readResponse.getException().getMessage())); + } else { + FragmentInstanceInfo info = (FragmentInstanceInfo) readResponse.getDataset(); + resp.setAccepted(!info.getState().isFailed()); + resp.setMessage(info.getMessage()); + } + return resp; + } catch (Throwable t) { LOGGER.error( - "execute FragmentInstance in ConsensusGroup {} failed because {}", - req.getConsensusGroupId(), - readResponse.getException()); - return new TSendFragmentInstanceResp(false); + "Execute FragmentInstance in ConsensusGroup {} failed.", req.getConsensusGroupId(), t); + TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false); + resp.setMessage("Execute FragmentInstance failed: " + t.getMessage()); + return resp; } - FragmentInstanceInfo info = (FragmentInstanceInfo) readResponse.getDataset(); - return new TSendFragmentInstanceResp(!info.getState().isFailed()); } @Override @@ -204,11 +242,13 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface @Override public TFragmentInstanceStateResp fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req) { FragmentInstanceId instanceId = FragmentInstanceId.fromThrift(req.fragmentInstanceId); - try (SetThreadName threadName = new SetThreadName(instanceId.getFullId())) { - FragmentInstanceInfo info = FragmentInstanceManager.getInstance().getInstanceInfo(instanceId); - return info != null - ? new TFragmentInstanceStateResp(info.getState().toString()) - : new TFragmentInstanceStateResp(FragmentInstanceState.NO_SUCH_INSTANCE.toString()); + FragmentInstanceInfo info = FragmentInstanceManager.getInstance().getInstanceInfo(instanceId); + if (info != null) { + TFragmentInstanceStateResp resp = new TFragmentInstanceStateResp(info.getState().toString()); + resp.setFailedMessages(ImmutableList.of(info.getMessage())); + return resp; + } else { + return new TFragmentInstanceStateResp(FragmentInstanceState.NO_SUCH_INSTANCE.toString()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java index 3b8d2bd608..f7e9640455 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.utils; +import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution; import org.apache.iotdb.db.tools.watermark.WatermarkEncoder; import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet; @@ -178,7 +179,7 @@ public class QueryDataSetUtils { } public static TSQueryDataSet convertTsBlockByFetchSize( - IQueryExecution queryExecution, int fetchSize) throws IOException { + IQueryExecution queryExecution, int fetchSize) throws IOException, IoTDBException { int columnNum = queryExecution.getOutputValueColumnCount(); TSQueryDataSet tsQueryDataSet = new TSQueryDataSet(); // one time column and each value column has an actual value buffer and a bitmap value to diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift index 3ceeefb0c4..b0371aafc6 100644 --- a/thrift/src/main/thrift/datanode.thrift +++ b/thrift/src/main/thrift/datanode.thrift @@ -116,6 +116,7 @@ struct TFetchFragmentInstanceStateReq { // TODO: need to supply more fields according to implementation struct TFragmentInstanceStateResp { 1: required string state + 2: optional list<string> failedMessages } struct TCancelQueryReq {
