This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-pipe-source-reboot in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 04ff1111fdf28f1c3d6466db8f238cbe41f59f12 Author: Steve Yurong Su <[email protected]> AuthorDate: Sat Jun 3 06:43:54 2023 +0800 fix --- .../iotdb/confignode/manager/node/NodeManager.java | 4 +- .../consensus/index/impl/SimpleProgressIndex.java | 2 +- .../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 51 +++++++++++++++------- .../core/event/realtime/TsFileEpochManager.java | 6 +-- .../db/sync/common/ClusterSyncInfoFetcher.java | 13 +----- 5 files changed, 41 insertions(+), 35 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index 2baa07f1973..0276a5d7b4c 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -206,8 +206,8 @@ public class NodeManager { } private TRuntimeConfiguration getRuntimeConfiguration() { - getPipeManager().getPipePluginCoordinator().lock(); getPipeManager().getPipeTaskCoordinator().lock(); + getPipeManager().getPipePluginCoordinator().lock(); getTriggerManager().getTriggerInfo().acquireTriggerTableLock(); getUDFManager().getUdfInfo().acquireUDFTableLock(); @@ -226,8 +226,8 @@ public class NodeManager { } finally { getTriggerManager().getTriggerInfo().releaseTriggerTableLock(); getUDFManager().getUdfInfo().releaseUDFTableLock(); - getPipeManager().getPipeTaskCoordinator().unlock(); getPipeManager().getPipePluginCoordinator().unlock(); + getPipeManager().getPipeTaskCoordinator().unlock(); } } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java index eaa80096823..9ce9890ecb4 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java @@ -165,7 +165,7 @@ public class SimpleProgressIndex implements ProgressIndex { // thatSimpleProgressIndex.memtableFlushOrderId return this; } finally { - lock.writeLock().lock(); + lock.writeLock().unlock(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java index d086e6e71ea..d21d8ed5d4e 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** @@ -94,11 +95,11 @@ public class PipeTaskAgent { // if pipe meta does not exist on data node, create a new pipe if (metaOnDataNode == null) { - createPipe(metaFromConfigNode); - if (metaFromConfigNode.getRuntimeMeta().getStatus().get() == PipeStatus.RUNNING) { + if (createPipe(metaFromConfigNode)) { + // if the status recorded in config node is RUNNING, start the pipe startPipe(pipeName, metaFromConfigNode.getStaticMeta().getCreationTime()); } - // if the status is STOPPED or DROPPED, do nothing + // if the status recorded in config node is STOPPED or DROPPED, do nothing continue; } @@ -109,8 +110,7 @@ public class PipeTaskAgent { // first check if pipe static meta has changed, if so, drop the pipe and create a new one if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) { dropPipe(pipeName); - createPipe(metaFromConfigNode); - if (metaFromConfigNode.getRuntimeMeta().getStatus().get() == PipeStatus.RUNNING) { + if (createPipe(metaFromConfigNode)) { startPipe(pipeName, metaFromConfigNode.getStaticMeta().getCreationTime()); } // if the status is STOPPED or DROPPED, do nothing @@ -245,9 +245,17 @@ public class PipeTaskAgent { ////////////////////////// Manage by Pipe Name ////////////////////////// - private void createPipe(PipeMeta pipeMeta) { - final String pipeName = pipeMeta.getStaticMeta().getPipeName(); - final long creationTime = pipeMeta.getStaticMeta().getCreationTime(); + /** + * Create a new pipe. If the pipe already exists, do nothing and return false. Otherwise, create + * the pipe and return true. + * + * @param pipeMetaFromConfigNode pipe meta from config node + * @return true if the pipe is created successfully and should be started, false if the pipe + * already exists or is created but should not be started + */ + private boolean createPipe(PipeMeta pipeMetaFromConfigNode) { + final String pipeName = pipeMetaFromConfigNode.getStaticMeta().getPipeName(); + final long creationTime = pipeMetaFromConfigNode.getStaticMeta().getCreationTime(); final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); if (existedPipeMeta != null) { @@ -260,7 +268,7 @@ public class PipeTaskAgent { pipeName, creationTime, existedPipeMeta.getRuntimeMeta().getStatus().get().name()); - return; + return false; case DROPPED: LOGGER.info( "Pipe {} (creation time = {}) has already been dropped, but the pipe task meta has not been cleaned up. " @@ -284,16 +292,25 @@ public class PipeTaskAgent { } // create pipe tasks and trigger create() method for each pipe task - final Map<TConsensusGroupId, PipeTask> pipeTasks = new PipeBuilder(pipeMeta).build(); + final Map<TConsensusGroupId, PipeTask> pipeTasks = + new PipeBuilder(pipeMetaFromConfigNode).build(); for (PipeTask pipeTask : pipeTasks.values()) { pipeTask.create(); } - pipeTaskManager.addPipeTasks(pipeMeta.getStaticMeta(), pipeTasks); + pipeTaskManager.addPipeTasks(pipeMetaFromConfigNode.getStaticMeta(), pipeTasks); + + // No matter the pipe status from config node is RUNNING or STOPPED, we always set the status + // of pipe meta to STOPPED when it is created. The STOPPED status should always be the initial + // status of a pipe, which makes the status transition logic simpler. + final AtomicReference<PipeStatus> pipeStatusFromConfigNode = + pipeMetaFromConfigNode.getRuntimeMeta().getStatus(); + final boolean needToStartPipe = pipeStatusFromConfigNode.get() == PipeStatus.RUNNING; + pipeStatusFromConfigNode.set(PipeStatus.STOPPED); + + pipeMetaKeeper.addPipeMeta(pipeName, pipeMetaFromConfigNode); - // add pipe meta to pipe meta keeper - // note that we do not need to set the status of pipe meta here, because the status of pipe meta - // is already set to STOPPED when it is created - pipeMetaKeeper.addPipeMeta(pipeName, pipeMeta); + // If the pipe status from config node is RUNNING, we will start the pipe later. + return needToStartPipe; } private void dropPipe(String pipeName, long creationTime) { @@ -551,7 +568,8 @@ public class PipeTaskAgent { public synchronized void collectPipeMetaList(THeartbeatReq req, THeartbeatResp resp) throws TException { - if (!req.isNeedPipeMetaList()) { + // do nothing if data node is removing or removed, or request does not need pipe meta list + if (PipeAgent.runtime().isShutdown() || !req.isNeedPipeMetaList()) { return; } @@ -559,6 +577,7 @@ public class PipeTaskAgent { try { for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { pipeMetaBinaryList.add(pipeMeta.serialize()); + LOGGER.info("Reporting pipe meta: {}", pipeMeta); } } catch (IOException e) { throw new TException(e); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java index 1dcc1fb9320..3e09b86e74f 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java @@ -46,15 +46,13 @@ public class TsFileEpochManager { // this would not happen, but just in case if (!filePath2Epoch.containsKey(filePath)) { - LOGGER.warn( - String.format("PipeEngine: can not find TsFileEpoch for TsFile %s, create it", filePath)); + LOGGER.info( + String.format("Pipe: can not find TsFileEpoch for TsFile %s, creating it", filePath)); filePath2Epoch.put(filePath, new TsFileEpoch(filePath)); } return new PipeRealtimeCollectEvent( event, - // TODO: we have to make sure that the TsFileInsertionEvent is the last event of the - // TsFileEpoch's life cycle filePath2Epoch.remove(filePath), resource.getDevices().stream() .collect(Collectors.toMap(device -> device, device -> EMPTY_MEASUREMENT_ARRAY)), diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java index 929a7ce4f47..f69f55cd6f7 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java @@ -25,7 +25,6 @@ import org.apache.iotdb.commons.exception.sync.PipeSinkException; import org.apache.iotdb.commons.sync.pipe.PipeInfo; import org.apache.iotdb.commons.sync.pipe.PipeMessage; import org.apache.iotdb.commons.sync.pipesink.PipeSink; -import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq; import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp; import org.apache.iotdb.db.client.ConfigNodeClient; @@ -41,7 +40,6 @@ import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; -import java.util.stream.Collectors; /** Only fetch read request. For write request, return SUCCESS directly. */ public class ClusterSyncInfoFetcher implements ISyncInfoFetcher { @@ -111,16 +109,7 @@ public class ClusterSyncInfoFetcher implements ISyncInfoFetcher { @Override public List<PipeInfo> getAllPipeInfos() { - try (ConfigNodeClient configNodeClient = - CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - TGetAllPipeInfoResp resp = configNodeClient.getAllPipeInfo(); - return resp.getAllPipeInfo().stream() - .map(PipeInfo::deserializePipeInfo) - .collect(Collectors.toList()); - } catch (Exception e) { - LOGGER.error("Get AllPipeInfos error because {}", e.getMessage(), e); - return Collections.emptyList(); - } + return Collections.emptyList(); } @Override
