This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-pipe-source-reboot in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d3a83ecfaccbf1f70a5965c40e914131967688db Author: Steve Yurong Su <[email protected]> AuthorDate: Sat Jun 3 00:17:03 2023 +0800 add logs for pipe meta changes --- .../confignode/persistence/pipe/PipeTaskInfo.java | 2 ++ .../runtime/PipeHandleMetaChangeProcedure.java | 20 ++++++++++++++++--- .../impl/pipe/runtime/PipeMetaSyncProcedure.java | 2 +- .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 5 +++-- .../db/pipe/agent/runtime/HeartbeatScheduler.java | 23 ---------------------- .../db/pipe/agent/runtime/PipeAgentLauncher.java | 12 ++++++++--- 6 files changed, 32 insertions(+), 32 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index 2c9485c29fc..d19c3588468 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -234,11 +234,13 @@ public class PipeTaskInfo implements SnapshotProcessor { } public TSStatus handleMetaChanges(PipeHandleMetaChangePlan plan) { + LOGGER.info("Handling pipe meta changes ..."); pipeMetaKeeper.clear(); plan.getPipeMetaList() .forEach( pipeMeta -> { pipeMetaKeeper.addPipeMeta(pipeMeta.getStaticMeta().getPipeName(), pipeMeta); + LOGGER.info("Recording pipe meta: {}", pipeMeta); }); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java index 8f9f9207ebc..46a8fc1199a 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java @@ -138,9 +138,20 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV .getValue() .getProgressIndex() .isAfter(runtimeMetaFromDataNode.getProgressIndex())) { - runtimeMetaOnConfigNode - .getValue() - .updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex()); + LOGGER.info( + "Updating progress index for (pipe name: {}, consensus group id: {}) ... Progress index on config node: {}, progress index from data node: {}", + pipeMetaOnConfigNode.getStaticMeta().getPipeName(), + runtimeMetaOnConfigNode.getKey(), + runtimeMetaOnConfigNode.getValue().getProgressIndex(), + runtimeMetaFromDataNode.getProgressIndex()); + LOGGER.info( + "Progress index for (pipe name: {}, consensus group id: {}) is updated to {}", + pipeMetaOnConfigNode.getStaticMeta().getPipeName(), + runtimeMetaOnConfigNode.getKey(), + runtimeMetaOnConfigNode + .getValue() + .updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex())); + needWriteConsensusOnConfigNodes = true; } @@ -149,6 +160,7 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV pipeTaskMetaOnConfigNode.clearExceptionMessages(); for (final PipeRuntimeException exception : runtimeMetaFromDataNode.getExceptionMessages()) { + pipeTaskMetaOnConfigNode.trackExceptionMessage(exception); if (exception instanceof PipeRuntimeCriticalException) { @@ -159,6 +171,7 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV .get() .equals(PipeStatus.STOPPED)) { pipeMetaOnConfigNode.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED); + needWriteConsensusOnConfigNodes = true; needPushPipeMetaToDataNodes = true; @@ -181,6 +194,7 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV .forEach( status -> { status.set(PipeStatus.STOPPED); + needWriteConsensusOnConfigNodes = true; needPushPipeMetaToDataNodes = true; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java index 167008e2ed3..dfa2283775a 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java @@ -97,7 +97,7 @@ public class PipeMetaSyncProcedure extends AbstractOperatePipeProcedureV2 { protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException { LOGGER.info("PipeMetaSyncProcedure: rollbackFromOperateOnDataNodes"); - pushPipeMetaToDataNodes(env); + // do nothing } @Override diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java index 46547704d08..43330fa392b 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java @@ -52,8 +52,9 @@ public class PipeTaskMeta { return progressIndex.get(); } - public void updateProgressIndex(ProgressIndex updateIndex) { - progressIndex.updateAndGet(index -> index.updateToMinimumIsAfterProgressIndex(updateIndex)); + public ProgressIndex updateProgressIndex(ProgressIndex updateIndex) { + return progressIndex.updateAndGet( + index -> index.updateToMinimumIsAfterProgressIndex(updateIndex)); } public int getLeaderDataNodeId() { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java deleted file mode 100644 index de3f30f388b..00000000000 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.pipe.agent.runtime; - -/** HeartbeatScheduler is used to schedule the heartbeat of the pipe. */ -public class HeartbeatScheduler {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java index adf15709aff..16f3716bc13 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java @@ -163,13 +163,19 @@ class PipeAgentLauncher { PipeAgent.task() .handlePipeMetaChanges( getAllPipeInfoResp.getAllPipeInfo().stream() - .map(PipeMeta::deserialize) + .map( + byteBuffer -> { + final PipeMeta pipeMeta = PipeMeta.deserialize(byteBuffer); + LOGGER.info( + "Pulled pipe meta from config node: {}, recovering ...", pipeMeta); + return pipeMeta; + }) .collect(Collectors.toList())); - } catch (Throwable throwable) { + } catch (Exception e) { LOGGER.info( "Failed to get pipe task meta from config node. Ignore the exception, " + "because config node may not be ready yet, and meta will be pushed by config node later.", - throwable); + e); } } }
