This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5d78f1a915f [IOTDB-5968] Pipe: pipe task does not work properly after
cluster reboot (#10046)
5d78f1a915f is described below
commit 5d78f1a915f3c51c637a26e2c691ee07d5aefa2b
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sun Jun 4 01:23:14 2023 +0800
[IOTDB-5968] Pipe: pipe task does not work properly after cluster reboot
(#10046)
* fix: dead lock issue found in NodeManager.java
* fix: dead lock issue found in SimpleProgressIndex.java
* fix: logic error of 'pipe task start' found in PipeTaskAgent.java , which
may cause data consistency issues
* fix: NPE error from ClusterSyncInfoFetcher.java
* improvement: reduce some exception log print
* improvement: introduce some log print for progress index reporting and
recovering
---
.../iotdb/confignode/manager/node/NodeManager.java | 9 +++-
.../confignode/persistence/pipe/PipeTaskInfo.java | 9 ++--
.../runtime/PipeHandleLeaderChangeProcedure.java | 8 ++--
.../runtime/PipeHandleMetaChangeProcedure.java | 24 +++++++---
.../impl/pipe/runtime/PipeMetaSyncProcedure.java | 8 ++--
.../pipe/task/AbstractOperatePipeProcedureV2.java | 8 ++++
.../consensus/index/impl/SimpleProgressIndex.java | 2 +-
.../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 5 ++-
.../db/pipe/agent/runtime/HeartbeatScheduler.java | 23 ----------
.../db/pipe/agent/runtime/PipeAgentLauncher.java | 12 +++--
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 51 +++++++++++++++-------
.../core/event/realtime/TsFileEpochManager.java | 6 +--
.../db/sync/common/ClusterSyncInfoFetcher.java | 13 +-----
13 files changed, 98 insertions(+), 80 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 2baa07f1973..8dcf95789d8 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -206,8 +206,10 @@ public class NodeManager {
}
private TRuntimeConfiguration getRuntimeConfiguration() {
- getPipeManager().getPipePluginCoordinator().lock();
+ // getPipeTaskCoordinator.lock() should be called outside the
getPipePluginCoordinator().lock()
+ // to avoid deadlock
getPipeManager().getPipeTaskCoordinator().lock();
+ getPipeManager().getPipePluginCoordinator().lock();
getTriggerManager().getTriggerInfo().acquireTriggerTableLock();
getUDFManager().getUdfInfo().acquireUDFTableLock();
@@ -226,8 +228,11 @@ public class NodeManager {
} finally {
getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
getUDFManager().getUdfInfo().releaseUDFTableLock();
- getPipeManager().getPipeTaskCoordinator().unlock();
getPipeManager().getPipePluginCoordinator().unlock();
+ // getPipeTaskCoordinator.unlock() should be called outside the
+ // getPipePluginCoordinator().unlock()
+ // to avoid deadlock
+ getPipeManager().getPipeTaskCoordinator().unlock();
}
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 2c9485c29fc..8aaa0948f80 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -223,22 +223,23 @@ public class PipeTaskInfo implements SnapshotProcessor {
dataRegionGroupId,
new PipeTaskMeta(
new MinimumProgressIndex(),
newDataRegionLeader));
- } else {
- LOGGER.warn(
- "The pipe task meta does not contain the
data region group {} or the data region group has already been removed",
- dataRegionGroupId);
}
+ // else:
+ // "The pipe task meta does not contain the data
region group {} or
+ // the data region group has already been removed"
}
}));
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
public TSStatus handleMetaChanges(PipeHandleMetaChangePlan plan) {
+ LOGGER.info("Handling pipe meta changes ...");
pipeMetaKeeper.clear();
plan.getPipeMetaList()
.forEach(
pipeMeta -> {
pipeMetaKeeper.addPipeMeta(pipeMeta.getStaticMeta().getPipeName(), pipeMeta);
+ LOGGER.info("Recording pipe meta: {}", pipeMeta);
});
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index 0c218184b5c..3cbcebe6f6b 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -100,10 +100,10 @@ public class PipeHandleLeaderChangeProcedure extends
AbstractOperatePipeProcedur
}
@Override
- protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
throws IOException {
+ protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
LOGGER.info("PipeHandleLeaderChangeProcedure:
executeFromHandleOnDataNodes");
- pushPipeMetaToDataNodes(env);
+ pushPipeMetaToDataNodesIgnoreException(env);
}
@Override
@@ -142,10 +142,10 @@ public class PipeHandleLeaderChangeProcedure extends
AbstractOperatePipeProcedur
}
@Override
- protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
throws IOException {
+ protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
LOGGER.info("PipeHandleLeaderChangeProcedure:
rollbackFromCreateOnDataNodes");
- pushPipeMetaToDataNodes(env);
+ pushPipeMetaToDataNodesIgnoreException(env);
}
@Override
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 8f9f9207ebc..39948e9fbb9 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -138,9 +138,20 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
.getValue()
.getProgressIndex()
.isAfter(runtimeMetaFromDataNode.getProgressIndex())) {
- runtimeMetaOnConfigNode
- .getValue()
- .updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex());
+ LOGGER.info(
+ "Updating progress index for (pipe name: {}, consensus group id:
{}) ... Progress index on config node: {}, progress index from data node: {}",
+ pipeMetaOnConfigNode.getStaticMeta().getPipeName(),
+ runtimeMetaOnConfigNode.getKey(),
+ runtimeMetaOnConfigNode.getValue().getProgressIndex(),
+ runtimeMetaFromDataNode.getProgressIndex());
+ LOGGER.info(
+ "Progress index for (pipe name: {}, consensus group id: {}) is
updated to {}",
+ pipeMetaOnConfigNode.getStaticMeta().getPipeName(),
+ runtimeMetaOnConfigNode.getKey(),
+ runtimeMetaOnConfigNode
+ .getValue()
+
.updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex()));
+
needWriteConsensusOnConfigNodes = true;
}
@@ -149,6 +160,7 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
pipeTaskMetaOnConfigNode.clearExceptionMessages();
for (final PipeRuntimeException exception :
runtimeMetaFromDataNode.getExceptionMessages()) {
+
pipeTaskMetaOnConfigNode.trackExceptionMessage(exception);
if (exception instanceof PipeRuntimeCriticalException) {
@@ -159,6 +171,7 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
.get()
.equals(PipeStatus.STOPPED)) {
pipeMetaOnConfigNode.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
+
needWriteConsensusOnConfigNodes = true;
needPushPipeMetaToDataNodes = true;
@@ -181,6 +194,7 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
.forEach(
status -> {
status.set(PipeStatus.STOPPED);
+
needWriteConsensusOnConfigNodes = true;
needPushPipeMetaToDataNodes = true;
@@ -224,14 +238,14 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
}
@Override
- protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
throws IOException {
+ protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
LOGGER.info("PipeHandleMetaChangeProcedure: executeFromHandleOnDataNodes");
if (!needPushPipeMetaToDataNodes) {
return;
}
- pushPipeMetaToDataNodes(env);
+ pushPipeMetaToDataNodesIgnoreException(env);
}
@Override
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 167008e2ed3..a7faa989ab9 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -66,10 +66,10 @@ public class PipeMetaSyncProcedure extends
AbstractOperatePipeProcedureV2 {
}
@Override
- protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
throws IOException {
+ protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
LOGGER.info("PipeMetaSyncProcedure: executeFromOperateOnDataNodes");
- pushPipeMetaToDataNodes(env);
+ pushPipeMetaToDataNodesIgnoreException(env);
}
@Override
@@ -94,10 +94,10 @@ public class PipeMetaSyncProcedure extends
AbstractOperatePipeProcedureV2 {
}
@Override
- protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
throws IOException {
+ protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
LOGGER.info("PipeMetaSyncProcedure: rollbackFromOperateOnDataNodes");
- pushPipeMetaToDataNodes(env);
+ // do nothing
}
@Override
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
index 15065499562..133a74854be 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java
@@ -200,6 +200,14 @@ public abstract class AbstractOperatePipeProcedureV2
}
}
+ protected void pushPipeMetaToDataNodesIgnoreException(ConfigNodeProcedureEnv
env) {
+ try {
+ pushPipeMetaToDataNodes(env);
+ } catch (Throwable throwable) {
+ LOGGER.info("Failed to push pipe meta list to data nodes, will retry
later.", throwable);
+ }
+ }
+
@Override
public void serialize(DataOutputStream stream) throws IOException {
super.serialize(stream);
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
index eaa80096823..9ce9890ecb4 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
@@ -165,7 +165,7 @@ public class SimpleProgressIndex implements ProgressIndex {
// thatSimpleProgressIndex.memtableFlushOrderId
return this;
} finally {
- lock.writeLock().lock();
+ lock.writeLock().unlock();
}
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index 46547704d08..43330fa392b 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -52,8 +52,9 @@ public class PipeTaskMeta {
return progressIndex.get();
}
- public void updateProgressIndex(ProgressIndex updateIndex) {
- progressIndex.updateAndGet(index ->
index.updateToMinimumIsAfterProgressIndex(updateIndex));
+ public ProgressIndex updateProgressIndex(ProgressIndex updateIndex) {
+ return progressIndex.updateAndGet(
+ index -> index.updateToMinimumIsAfterProgressIndex(updateIndex));
}
public int getLeaderDataNodeId() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
deleted file mode 100644
index de3f30f388b..00000000000
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.agent.runtime;
-
-/** HeartbeatScheduler is used to schedule the heartbeat of the pipe. */
-public class HeartbeatScheduler {}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index adf15709aff..16f3716bc13 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -163,13 +163,19 @@ class PipeAgentLauncher {
PipeAgent.task()
.handlePipeMetaChanges(
getAllPipeInfoResp.getAllPipeInfo().stream()
- .map(PipeMeta::deserialize)
+ .map(
+ byteBuffer -> {
+ final PipeMeta pipeMeta =
PipeMeta.deserialize(byteBuffer);
+ LOGGER.info(
+ "Pulled pipe meta from config node: {}, recovering
...", pipeMeta);
+ return pipeMeta;
+ })
.collect(Collectors.toList()));
- } catch (Throwable throwable) {
+ } catch (Exception e) {
LOGGER.info(
"Failed to get pipe task meta from config node. Ignore the
exception, "
+ "because config node may not be ready yet, and meta will be
pushed by config node later.",
- throwable);
+ e);
}
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index d086e6e71ea..d21d8ed5d4e 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -49,6 +49,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
@@ -94,11 +95,11 @@ public class PipeTaskAgent {
// if pipe meta does not exist on data node, create a new pipe
if (metaOnDataNode == null) {
- createPipe(metaFromConfigNode);
- if (metaFromConfigNode.getRuntimeMeta().getStatus().get() ==
PipeStatus.RUNNING) {
+ if (createPipe(metaFromConfigNode)) {
+ // if the status recorded in config node is RUNNING, start the pipe
startPipe(pipeName,
metaFromConfigNode.getStaticMeta().getCreationTime());
}
- // if the status is STOPPED or DROPPED, do nothing
+ // if the status recorded in config node is STOPPED or DROPPED, do
nothing
continue;
}
@@ -109,8 +110,7 @@ public class PipeTaskAgent {
// first check if pipe static meta has changed, if so, drop the pipe and
create a new one
if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) {
dropPipe(pipeName);
- createPipe(metaFromConfigNode);
- if (metaFromConfigNode.getRuntimeMeta().getStatus().get() ==
PipeStatus.RUNNING) {
+ if (createPipe(metaFromConfigNode)) {
startPipe(pipeName,
metaFromConfigNode.getStaticMeta().getCreationTime());
}
// if the status is STOPPED or DROPPED, do nothing
@@ -245,9 +245,17 @@ public class PipeTaskAgent {
////////////////////////// Manage by Pipe Name //////////////////////////
- private void createPipe(PipeMeta pipeMeta) {
- final String pipeName = pipeMeta.getStaticMeta().getPipeName();
- final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
+ /**
+ * Create a new pipe. If the pipe already exists, do nothing and return
false. Otherwise, create
+ * the pipe and return true.
+ *
+ * @param pipeMetaFromConfigNode pipe meta from config node
+ * @return true if the pipe is created successfully and should be started,
false if the pipe
+ * already exists or is created but should not be started
+ */
+ private boolean createPipe(PipeMeta pipeMetaFromConfigNode) {
+ final String pipeName =
pipeMetaFromConfigNode.getStaticMeta().getPipeName();
+ final long creationTime =
pipeMetaFromConfigNode.getStaticMeta().getCreationTime();
final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (existedPipeMeta != null) {
@@ -260,7 +268,7 @@ public class PipeTaskAgent {
pipeName,
creationTime,
existedPipeMeta.getRuntimeMeta().getStatus().get().name());
- return;
+ return false;
case DROPPED:
LOGGER.info(
"Pipe {} (creation time = {}) has already been dropped, but
the pipe task meta has not been cleaned up. "
@@ -284,16 +292,25 @@ public class PipeTaskAgent {
}
// create pipe tasks and trigger create() method for each pipe task
- final Map<TConsensusGroupId, PipeTask> pipeTasks = new
PipeBuilder(pipeMeta).build();
+ final Map<TConsensusGroupId, PipeTask> pipeTasks =
+ new PipeBuilder(pipeMetaFromConfigNode).build();
for (PipeTask pipeTask : pipeTasks.values()) {
pipeTask.create();
}
- pipeTaskManager.addPipeTasks(pipeMeta.getStaticMeta(), pipeTasks);
+ pipeTaskManager.addPipeTasks(pipeMetaFromConfigNode.getStaticMeta(),
pipeTasks);
+
+ // No matter the pipe status from config node is RUNNING or STOPPED, we
always set the status
+ // of pipe meta to STOPPED when it is created. The STOPPED status should
always be the initial
+ // status of a pipe, which makes the status transition logic simpler.
+ final AtomicReference<PipeStatus> pipeStatusFromConfigNode =
+ pipeMetaFromConfigNode.getRuntimeMeta().getStatus();
+ final boolean needToStartPipe = pipeStatusFromConfigNode.get() ==
PipeStatus.RUNNING;
+ pipeStatusFromConfigNode.set(PipeStatus.STOPPED);
+
+ pipeMetaKeeper.addPipeMeta(pipeName, pipeMetaFromConfigNode);
- // add pipe meta to pipe meta keeper
- // note that we do not need to set the status of pipe meta here, because
the status of pipe meta
- // is already set to STOPPED when it is created
- pipeMetaKeeper.addPipeMeta(pipeName, pipeMeta);
+ // If the pipe status from config node is RUNNING, we will start the pipe
later.
+ return needToStartPipe;
}
private void dropPipe(String pipeName, long creationTime) {
@@ -551,7 +568,8 @@ public class PipeTaskAgent {
public synchronized void collectPipeMetaList(THeartbeatReq req,
THeartbeatResp resp)
throws TException {
- if (!req.isNeedPipeMetaList()) {
+ // do nothing if data node is removing or removed, or request does not
need pipe meta list
+ if (PipeAgent.runtime().isShutdown() || !req.isNeedPipeMetaList()) {
return;
}
@@ -559,6 +577,7 @@ public class PipeTaskAgent {
try {
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
+ LOGGER.info("Reporting pipe meta: {}", pipeMeta);
}
} catch (IOException e) {
throw new TException(e);
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
index 1dcc1fb9320..3e09b86e74f 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
@@ -46,15 +46,13 @@ public class TsFileEpochManager {
// this would not happen, but just in case
if (!filePath2Epoch.containsKey(filePath)) {
- LOGGER.warn(
- String.format("PipeEngine: can not find TsFileEpoch for TsFile %s,
create it", filePath));
+ LOGGER.info(
+ String.format("Pipe: can not find TsFileEpoch for TsFile %s,
creating it", filePath));
filePath2Epoch.put(filePath, new TsFileEpoch(filePath));
}
return new PipeRealtimeCollectEvent(
event,
- // TODO: we have to make sure that the TsFileInsertionEvent is the
last event of the
- // TsFileEpoch's life cycle
filePath2Epoch.remove(filePath),
resource.getDevices().stream()
.collect(Collectors.toMap(device -> device, device ->
EMPTY_MEASUREMENT_ARRAY)),
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
index 929a7ce4f47..f69f55cd6f7 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
@@ -25,7 +25,6 @@ import
org.apache.iotdb.commons.exception.sync.PipeSinkException;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.commons.sync.pipe.PipeMessage;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
-import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
import org.apache.iotdb.db.client.ConfigNodeClient;
@@ -41,7 +40,6 @@ import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
-import java.util.stream.Collectors;
/** Only fetch read request. For write request, return SUCCESS directly. */
public class ClusterSyncInfoFetcher implements ISyncInfoFetcher {
@@ -111,16 +109,7 @@ public class ClusterSyncInfoFetcher implements
ISyncInfoFetcher {
@Override
public List<PipeInfo> getAllPipeInfos() {
- try (ConfigNodeClient configNodeClient =
-
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- TGetAllPipeInfoResp resp = configNodeClient.getAllPipeInfo();
- return resp.getAllPipeInfo().stream()
- .map(PipeInfo::deserializePipeInfo)
- .collect(Collectors.toList());
- } catch (Exception e) {
- LOGGER.error("Get AllPipeInfos error because {}", e.getMessage(), e);
- return Collections.emptyList();
- }
+ return Collections.emptyList();
}
@Override