This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch sync-huge-tsfile in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e0a4c8d3c3fab840fd8e94b949120a1651e687c8 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed May 8 15:10:00 2024 +0800 Load: dynamically adjust connection timeout to handle SocketTimeoutException & Avoid resource cleaning when load task is in process --- .../execution/load/LoadTsFileManager.java | 65 +++++-- .../scheduler/load/LoadTsFileDispatcherImpl.java | 190 ++++++++++++--------- .../iotdb/db/storageengine/StorageEngine.java | 3 +- 3 files changed, 168 insertions(+), 90 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java index ef4ed7f16b9..b568b887988 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java @@ -54,6 +54,7 @@ import java.nio.file.Path; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; @@ -104,6 +105,13 @@ public class LoadTsFileManager { final CleanupTask cleanupTask = cleanupTaskQueue.peek(); if (cleanupTask.scheduledTime <= System.currentTimeMillis()) { + if (cleanupTask.isLoadTaskRunning) { + cleanupTaskQueue.poll(); + cleanupTask.resetScheduledTime(); + cleanupTaskQueue.add(cleanupTask); + continue; + } + cleanupTask.run(); uuid2CleanupTask.remove(cleanupTask.uuid); @@ -157,17 +165,24 @@ public class LoadTsFileManager { } } - final TsFileWriterManager writerManager = - uuid2WriterManager.computeIfAbsent( - uuid, o -> new TsFileWriterManager(SystemFileFactory.INSTANCE.getFile(loadDir, uuid))); - for (TsFileData tsFileData : pieceNode.getAllTsFileData()) { - if (!tsFileData.isModification()) { - ChunkData chunkData = (ChunkData) tsFileData; - writerManager.write( - new DataPartitionInfo(dataRegion, chunkData.getTimePartitionSlot()), chunkData); - } else { - writerManager.writeDeletion(dataRegion, (DeletionData) tsFileData); + final Optional<CleanupTask> cleanupTask = Optional.of(uuid2CleanupTask.get(uuid)); + cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning); + try { + final TsFileWriterManager writerManager = + uuid2WriterManager.computeIfAbsent( + uuid, + o -> new TsFileWriterManager(SystemFileFactory.INSTANCE.getFile(loadDir, uuid))); + for (TsFileData tsFileData : pieceNode.getAllTsFileData()) { + if (!tsFileData.isModification()) { + ChunkData chunkData = (ChunkData) tsFileData; + writerManager.write( + new DataPartitionInfo(dataRegion, chunkData.getTimePartitionSlot()), chunkData); + } else { + writerManager.writeDeletion(dataRegion, (DeletionData) tsFileData); + } } + } finally { + cleanupTask.ifPresent(CleanupTask::markLoadTaskNotRunning); } } @@ -176,7 +191,15 @@ public class LoadTsFileManager { if (!uuid2WriterManager.containsKey(uuid)) { return false; } - uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe, progressIndex); + + final Optional<CleanupTask> cleanupTask = Optional.of(uuid2CleanupTask.get(uuid)); + cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning); + try { + uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe, progressIndex); + } finally { + cleanupTask.ifPresent(CleanupTask::markLoadTaskNotRunning); + } + clean(uuid); return true; } @@ -412,12 +435,30 @@ public class LoadTsFileManager { private class CleanupTask implements Runnable, Comparable<CleanupTask> { private final String uuid; - private final long scheduledTime; + private final long delayInMs; + private long scheduledTime; + + private volatile boolean isLoadTaskRunning = false; private volatile boolean isCanceled = false; private CleanupTask(String uuid, long delayInMs) { this.uuid = uuid; + this.delayInMs = delayInMs; + resetScheduledTime(); + } + + public void markLoadTaskRunning() { + isLoadTaskRunning = true; + resetScheduledTime(); + } + + public void markLoadTaskNotRunning() { + isLoadTaskRunning = false; + resetScheduledTime(); + } + + public void resetScheduledTime() { scheduledTime = System.currentTimeMillis() + delayInMs; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java index 0d5f0bb9794..83a12e0d637 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java @@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; -import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.consensus.ConsensusGroupId; @@ -51,23 +50,29 @@ import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import io.airlift.concurrent.SetThreadName; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import static com.google.common.util.concurrent.Futures.immediateFuture; public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher { + private static final Logger LOGGER = LoggerFactory.getLogger(LoadTsFileDispatcherImpl.class); + private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 1 day + private static final AtomicInteger CONNECTION_TIMEOUT_IN_MS = + new AtomicInteger(IoTDBDescriptor.getInstance().getConfig().getConnectionTimeoutInMS()); + private String uuid; private final String localhostIpAddr; private final int localhostInternalPort; @@ -76,8 +81,6 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher { private final ExecutorService executor; private final boolean isGeneratedByPipe; - private static final String NODE_CONNECTION_ERROR = "can't connect to node {}"; - public LoadTsFileDispatcherImpl( IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager, boolean isGeneratedByPipe) { @@ -137,78 +140,6 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher { } } - private boolean isDispatchedToLocal(TEndPoint endPoint) { - return this.localhostIpAddr.equals(endPoint.getIp()) && localhostInternalPort == endPoint.port; - } - - private void dispatchRemote(TTsFilePieceReq loadTsFileReq, TEndPoint endPoint) - throws FragmentInstanceDispatchException { - try (SyncDataNodeInternalServiceClient client = - internalServiceClientManager.borrowClient(endPoint)) { - TLoadResp loadResp = client.sendTsFilePieceNode(loadTsFileReq); - if (!loadResp.isAccepted()) { - LOGGER.warn(loadResp.message); - throw new FragmentInstanceDispatchException(loadResp.status); - } - } catch (ClientManagerException | TException e) { - String warning = NODE_CONNECTION_ERROR; - LOGGER.warn(warning, endPoint, e); - TSStatus status = new TSStatus(); - status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode()); - status.setMessage(warning + endPoint); - throw new FragmentInstanceDispatchException(status); - } - } - - private void dispatchRemote(TLoadCommandReq loadCommandReq, TEndPoint endPoint) - throws FragmentInstanceDispatchException { - try (SyncDataNodeInternalServiceClient client = - internalServiceClientManager.borrowClient(endPoint)) { - TLoadResp loadResp = client.sendLoadCommand(loadCommandReq); - if (!loadResp.isAccepted()) { - LOGGER.warn(loadResp.message); - throw new FragmentInstanceDispatchException(loadResp.status); - } - } catch (ClientManagerException | TException e) { - LOGGER.warn(NODE_CONNECTION_ERROR, endPoint, e); - TSStatus status = new TSStatus(); - status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode()); - status.setMessage( - "can't connect to node " - + endPoint - + ", please reset longer dn_connection_timeout_ms " - + "in iotdb-datanode.properties and restart iotdb."); - throw new FragmentInstanceDispatchException(status); - } - } - - private void dispatchLocally(TLoadCommandReq loadCommandReq) - throws FragmentInstanceDispatchException { - final ProgressIndex progressIndex; - if (loadCommandReq.isSetProgressIndex()) { - progressIndex = - ProgressIndexType.deserializeFrom(ByteBuffer.wrap(loadCommandReq.getProgressIndex())); - } else { - // fallback to use local generated progress index for compatibility - progressIndex = PipeAgent.runtime().getNextProgressIndexForTsFileLoad(); - LOGGER.info( - "Use local generated load progress index {} for uuid {}.", - progressIndex, - loadCommandReq.uuid); - } - - final TSStatus resultStatus = - StorageEngine.getInstance() - .executeLoadCommand( - LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType], - loadCommandReq.uuid, - loadCommandReq.isSetIsGeneratedByPipe() && loadCommandReq.isGeneratedByPipe, - progressIndex); - if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) { - throw new FragmentInstanceDispatchException(resultStatus); - } - } - public void dispatchLocally(FragmentInstance instance) throws FragmentInstanceDispatchException { LOGGER.info("Receive load node from uuid {}.", uuid); @@ -259,6 +190,32 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher { } } + private void dispatchRemote(TTsFilePieceReq loadTsFileReq, TEndPoint endPoint) + throws FragmentInstanceDispatchException { + try (SyncDataNodeInternalServiceClient client = + internalServiceClientManager.borrowClient(endPoint)) { + client.setTimeout(CONNECTION_TIMEOUT_IN_MS.get()); + + final TLoadResp loadResp = client.sendTsFilePieceNode(loadTsFileReq); + if (!loadResp.isAccepted()) { + LOGGER.warn(loadResp.message); + throw new FragmentInstanceDispatchException(loadResp.status); + } + } catch (Exception e) { + adjustTimeoutIfNecessary(e); + + final String exceptionMessage = + String.format( + "failed to dispatch load command %s to node %s because of exception: %s", + loadTsFileReq, endPoint, e); + LOGGER.warn(exceptionMessage, e); + throw new FragmentInstanceDispatchException( + new TSStatus() + .setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode()) + .setMessage(exceptionMessage)); + } + } + public Future<FragInstanceDispatchResult> dispatchCommand( TLoadCommandReq loadCommandReq, Set<TRegionReplicaSet> replicaSets) { Set<TEndPoint> allEndPoint = new HashSet<>(); @@ -290,6 +247,87 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher { return immediateFuture(new FragInstanceDispatchResult(true)); } + private void dispatchLocally(TLoadCommandReq loadCommandReq) + throws FragmentInstanceDispatchException { + final ProgressIndex progressIndex; + if (loadCommandReq.isSetProgressIndex()) { + progressIndex = + ProgressIndexType.deserializeFrom(ByteBuffer.wrap(loadCommandReq.getProgressIndex())); + } else { + // fallback to use local generated progress index for compatibility + progressIndex = PipeAgent.runtime().getNextProgressIndexForTsFileLoad(); + LOGGER.info( + "Use local generated load progress index {} for uuid {}.", + progressIndex, + loadCommandReq.uuid); + } + + final TSStatus resultStatus = + StorageEngine.getInstance() + .executeLoadCommand( + LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType], + loadCommandReq.uuid, + loadCommandReq.isSetIsGeneratedByPipe() && loadCommandReq.isGeneratedByPipe, + progressIndex); + if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) { + throw new FragmentInstanceDispatchException(resultStatus); + } + } + + private void dispatchRemote(TLoadCommandReq loadCommandReq, TEndPoint endPoint) + throws FragmentInstanceDispatchException { + try (SyncDataNodeInternalServiceClient client = + internalServiceClientManager.borrowClient(endPoint)) { + client.setTimeout(CONNECTION_TIMEOUT_IN_MS.get()); + + final TLoadResp loadResp = client.sendLoadCommand(loadCommandReq); + if (!loadResp.isAccepted()) { + LOGGER.warn(loadResp.message); + throw new FragmentInstanceDispatchException(loadResp.status); + } + } catch (Exception e) { + adjustTimeoutIfNecessary(e); + + final String exceptionMessage = + String.format( + "failed to dispatch load command %s to node %s because of exception: %s", + loadCommandReq, endPoint, e); + LOGGER.warn(exceptionMessage, e); + throw new FragmentInstanceDispatchException( + new TSStatus() + .setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode()) + .setMessage(exceptionMessage)); + } + } + + private boolean isDispatchedToLocal(TEndPoint endPoint) { + return this.localhostIpAddr.equals(endPoint.getIp()) && localhostInternalPort == endPoint.port; + } + + private static void adjustTimeoutIfNecessary(Throwable e) { + do { + if (e instanceof SocketTimeoutException) { + int newConnectionTimeout; + try { + newConnectionTimeout = + Math.min( + Math.toIntExact(CONNECTION_TIMEOUT_IN_MS.get() * 2L), MAX_CONNECTION_TIMEOUT_MS); + } catch (ArithmeticException arithmeticException) { + newConnectionTimeout = MAX_CONNECTION_TIMEOUT_MS; + } + + if (newConnectionTimeout != CONNECTION_TIMEOUT_IN_MS.get()) { + CONNECTION_TIMEOUT_IN_MS.set(newConnectionTimeout); + LOGGER.info( + "Load remote procedure call connection timeout is adjusted to {} ms ({} mins)", + newConnectionTimeout, + newConnectionTimeout / 60000.0); + } + return; + } + } while ((e = e.getCause()) != null); + } + @Override public void abort() { // Do nothing diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java index 5bcff3443ec..c8c35e0958c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java @@ -41,7 +41,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.consensus.statemachine.dataregion.DataExecutionVisitor; import org.apache.iotdb.db.exception.DataRegionException; -import org.apache.iotdb.db.exception.LoadFileException; import org.apache.iotdb.db.exception.LoadReadOnlyException; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.TsFileProcessorException; @@ -903,7 +902,7 @@ public class StorageEngine implements IService { status.setCode(TSStatusCode.ILLEGAL_PARAMETER.getStatusCode()); status.setMessage(String.format("Wrong load command %s.", loadCommand)); } - } catch (IOException | LoadFileException e) { + } catch (Exception e) { LOGGER.error("Execute load command {} error.", loadCommand, e); status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()); status.setMessage(e.getMessage());
