This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch HighAvailability in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 396569ea003c503d2d4ea82e2382ed2f019d8f29 Author: JackieTien97 <[email protected]> AuthorDate: Mon Oct 23 16:33:09 2023 +0800 [IOTDB-6200] Change schema template to device template --- .../consensus/iot/client/DispatchLogHandler.java | 2 +- .../metadata/SeriesOverflowException.java | 5 +- .../plan/planner/plan/FragmentInstance.java | 11 +++ .../plan/scheduler/AsyncPlanNodeSender.java | 87 +++++++++++++++------- .../plan/scheduler/AsyncSendPlanNodeHandler.java | 27 ++++++- .../scheduler/FragmentInstanceDispatcherImpl.java | 26 +++++++ .../apache/iotdb/commons/client/ThriftClient.java | 2 +- 7 files changed, 127 insertions(+), 33 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java index b851b29981b..193adc6668b 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java @@ -69,7 +69,7 @@ public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogEntriesRe logDispatcherThreadMetrics.recordSyncLogTimePerRequest(System.nanoTime() - createTime); } - private boolean needRetry(int statusCode) { + public static boolean needRetry(int statusCode) { return statusCode == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode() || statusCode == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode() || statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java index f8def6004cf..72dd6b75cce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java @@ -28,10 +28,7 @@ public class SeriesOverflowException extends MetadataException { public SeriesOverflowException(long memoryUsage, long seriesNum) { super( String.format( - "There are too many timeseries in memory. " - + "Current memory usage is %s and series num is %s. " - + "Please increase MAX_HEAP_SIZE in datanode-env.sh/bat, " - + "restart and create timeseries again.", + "Too many timeseries in memory without device template(current memory: %s, series num: %s). To optimize memory, DEVICE TEMPLATE is more recommended when devices have same time series.", memoryUsage, seriesNum), TSStatusCode.SERIES_OVERFLOW.getStatusCode()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java index 9a211c8384a..fcef7b54a5e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java @@ -74,6 +74,9 @@ public class FragmentInstance implements IConsensusRequest { private boolean isHighestPriority; + // indicate which index we are retrying + private transient int nextRetryIndex = 0; + // We can add some more params for a specific FragmentInstance // So that we can make different FragmentInstance owns different data range. @@ -271,6 +274,14 @@ public class FragmentInstance implements IConsensusRequest { return hostDataNode; } + public TDataNodeLocation getNextRetriedHostDataNode() { + nextRetryIndex = + (nextRetryIndex + 1) % executorType.getRegionReplicaSet().getDataNodeLocations().size(); + this.hostDataNode = + executorType.getRegionReplicaSet().getDataNodeLocations().get(nextRetryIndex); + return hostDataNode; + } + public long getTimeOut() { return timeOut; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java index 525397123ba..c97c10e99de 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java @@ -35,15 +35,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; -import static com.google.common.util.concurrent.Futures.immediateFuture; - public class AsyncPlanNodeSender { private static final Logger logger = LoggerFactory.getLogger(AsyncPlanNodeSender.class); @@ -53,8 +51,11 @@ public class AsyncPlanNodeSender { private final Map<TEndPoint, BatchRequestWithIndex> batchRequests; private final Map<Integer, TSendSinglePlanNodeResp> instanceId2RespMap; + + private final List<Integer> needRetryInstanceIndex; + private final AtomicLong pendingNumber; - private final long startSendTime; + private long startSendTime; public AsyncPlanNodeSender( IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> @@ -77,6 +78,7 @@ public class AsyncPlanNodeSender { instances.get(i).getRegionReplicaSet().getRegionId())); } this.instanceId2RespMap = new ConcurrentHashMap<>(instances.size() + 1, 1); + this.needRetryInstanceIndex = Collections.synchronizedList(new ArrayList<>()); this.pendingNumber = new AtomicLong(batchRequests.keySet().size()); } @@ -84,7 +86,11 @@ public class AsyncPlanNodeSender { for (Map.Entry<TEndPoint, BatchRequestWithIndex> entry : batchRequests.entrySet()) { AsyncSendPlanNodeHandler handler = new AsyncSendPlanNodeHandler( - entry.getValue().getIndexes(), pendingNumber, instanceId2RespMap, startSendTime); + entry.getValue().getIndexes(), + pendingNumber, + instanceId2RespMap, + needRetryInstanceIndex, + startSendTime); try { AsyncDataNodeInternalServiceClient client = asyncInternalServiceClientManager.borrowClient(entry.getKey()); @@ -135,32 +141,61 @@ public class AsyncPlanNodeSender { return failureStatusList; } - public Future<FragInstanceDispatchResult> getResult() { - for (Map.Entry<Integer, TSendSinglePlanNodeResp> entry : instanceId2RespMap.entrySet()) { - if (!entry.getValue().accepted) { - logger.warn( - "dispatch write failed. status: {}, code: {}, message: {}, node {}", - entry.getValue().status, - TSStatusCode.representOf(entry.getValue().status.code), - entry.getValue().message, - instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint()); - if (entry.getValue().getStatus() == null) { - return immediateFuture( - new FragInstanceDispatchResult( - RpcUtils.getStatus( - TSStatusCode.WRITE_PROCESS_ERROR, entry.getValue().getMessage()))); - } else { - return immediateFuture(new FragInstanceDispatchResult(entry.getValue().getStatus())); - } - } + public boolean needRetry() { + // retried FI list is not empty and data region replica number is greater than 1 + return !needRetryInstanceIndex.isEmpty() + && instances.get(0).getRegionReplicaSet().dataNodeLocations.size() > 1; + } + + /** + * This function should be called after all last batch responses were received. This function will + * do the cleaning work and caller won't need to do the cleaning work outside this function. We + * will retry all failed FIs in last batch whose id were all saved in needRetryInstanceIds, if + * there are still failed FIs this time, they will be also saved in needRetryInstanceIds. + * + * <p>It's a sync function which means that once this function returned, the results of this retry + * have been received, and you don't need to call waitUntilCompleted. + */ + public void retry() throws InterruptedException { + // 1. rebuild the batchRequests using remaining failed FIs, change the replica for each failed + // FI in this step + batchRequests.clear(); + for (int fragmentInstanceIndex : needRetryInstanceIndex) { + this.batchRequests + .computeIfAbsent( + instances + .get(fragmentInstanceIndex) + .getNextRetriedHostDataNode() + .getInternalEndPoint(), + x -> new BatchRequestWithIndex()) + .addSinglePlanNodeReq( + fragmentInstanceIndex, + new TSendSinglePlanNodeReq( + new TPlanNode( + instances + .get(fragmentInstanceIndex) + .getFragment() + .getPlanNodeTree() + .serializeToByteBuffer()), + instances.get(fragmentInstanceIndex).getRegionReplicaSet().getRegionId())); } - return immediateFuture(new FragInstanceDispatchResult(true)); + + // 2. reset the pendingNumber, needRetryInstanceIds and startSendTime + needRetryInstanceIndex.clear(); + pendingNumber.set(batchRequests.keySet().size()); + startSendTime = System.nanoTime(); + + // 3. call sendAll() to retry + sendAll(); + + // 4. call waitUntilCompleted() to wait for the responses + waitUntilCompleted(); } /** * This class is used to aggregate PlanNode of the same datanode into one rpc. In order to ensure - * the one-to-one correspondence between response and request, the corresponding index needs to be - * recorded. + * the one-to-one correspondence between r esponse and request, the corresponding index needs to + * be recorded. */ static class BatchRequestWithIndex { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java index 63af1ffedb9..dea6a162655 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java @@ -20,22 +20,27 @@ package org.apache.iotdb.db.queryengine.plan.scheduler; import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; +import org.apache.iotdb.consensus.iot.client.DispatchLogHandler; import org.apache.iotdb.mpp.rpc.thrift.TSendBatchPlanNodeResp; import org.apache.iotdb.mpp.rpc.thrift.TSendSinglePlanNodeResp; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.thrift.async.AsyncMethodCallback; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import static org.apache.iotdb.commons.client.ThriftClient.isConnectionBroken; + public class AsyncSendPlanNodeHandler implements AsyncMethodCallback<TSendBatchPlanNodeResp> { private final List<Integer> instanceIds; private final AtomicLong pendingNumber; private final Map<Integer, TSendSinglePlanNodeResp> instanceId2RespMap; + private final List<Integer> needRetryInstanceIndex; private final long sendTime; private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS = PerformanceOverviewMetrics.getInstance(); @@ -44,17 +49,23 @@ public class AsyncSendPlanNodeHandler implements AsyncMethodCallback<TSendBatchP List<Integer> instanceIds, AtomicLong pendingNumber, Map<Integer, TSendSinglePlanNodeResp> instanceId2RespMap, + List<Integer> needRetryInstanceIndex, long sendTime) { this.instanceIds = instanceIds; this.pendingNumber = pendingNumber; this.instanceId2RespMap = instanceId2RespMap; + this.needRetryInstanceIndex = needRetryInstanceIndex; this.sendTime = sendTime; } @Override public void onComplete(TSendBatchPlanNodeResp sendBatchPlanNodeResp) { for (int i = 0; i < sendBatchPlanNodeResp.getResponses().size(); i++) { - instanceId2RespMap.put(instanceIds.get(i), sendBatchPlanNodeResp.getResponses().get(i)); + TSendSinglePlanNodeResp singlePlanNodeResp = sendBatchPlanNodeResp.getResponses().get(i); + instanceId2RespMap.put(instanceIds.get(i), singlePlanNodeResp); + if (needRetry(singlePlanNodeResp)) { + needRetryInstanceIndex.add(instanceIds.get(i)); + } } if (pendingNumber.decrementAndGet() == 0) { PERFORMANCE_OVERVIEW_METRICS.recordScheduleRemoteCost(System.nanoTime() - sendTime); @@ -66,6 +77,9 @@ public class AsyncSendPlanNodeHandler implements AsyncMethodCallback<TSendBatchP @Override public void onError(Exception e) { + if (needRetry(e)) { + needRetryInstanceIndex.addAll(instanceIds); + } TSendSinglePlanNodeResp resp = new TSendSinglePlanNodeResp(); String errorMsg = String.format("Fail to send plan node, exception message: %s", e); resp.setAccepted(false); @@ -80,4 +94,15 @@ public class AsyncSendPlanNodeHandler implements AsyncMethodCallback<TSendBatchP } } } + + private boolean needRetry(Exception e) { + Throwable rootCause = ExceptionUtils.getRootCause(e); + // if the exception is SocketException and its error message is Broken pipe, it means that the + // remote node may go offline + return isConnectionBroken(rootCause); + } + + private boolean needRetry(TSendSinglePlanNodeResp resp) { + return DispatchLogHandler.needRetry(resp.status.code); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index 2e8929402d1..0e698c7b81c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -57,6 +57,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import static com.google.common.util.concurrent.Futures.immediateFuture; import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ; @@ -210,6 +211,25 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { // wait until remote dispatch done try { asyncPlanNodeSender.waitUntilCompleted(); + + if (asyncPlanNodeSender.needRetry()) { + // retry failed remote FIs + int retry = 0; + final int maxRetryTimes = 10; + long waitMillis = getRetrySleepTime(retry); + + while (asyncPlanNodeSender.needRetry()) { + retry++; + asyncPlanNodeSender.retry(); + if (!(asyncPlanNodeSender.needRetry() && retry < maxRetryTimes)) { + break; + } + // still need to retry, sleep some time before make another retry. + Thread.sleep(waitMillis); + waitMillis = getRetrySleepTime(retry); + } + } + } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error("Interrupted when dispatching write async", e); @@ -239,6 +259,12 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { } } + private long getRetrySleepTime(int retryTimes) { + return Math.min( + (long) (TimeUnit.MILLISECONDS.toMillis(100) * Math.pow(2, retryTimes)), + TimeUnit.SECONDS.toMillis(20)); + } + private void dispatchOneInstance(FragmentInstance instance) throws FragmentInstanceDispatchException { TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java index e9d3b9c36ce..f57015cfab3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java @@ -102,7 +102,7 @@ public interface ThriftClient { * @param cause Throwable * @return true/false */ - static boolean isConnectionBroken(Throwable cause) { + public static boolean isConnectionBroken(Throwable cause) { return (cause instanceof SocketException && cause.getMessage().contains("Broken pipe")) || (cause instanceof TTransportException && cause.getMessage().contains("Socket is closed by peer"));
