This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bb728f04cc8 Pipe: Added RemainingEventCount/EstimatedRemainingSeconds
in configNode metrics/show pipes response (#12578)
bb728f04cc8 is described below
commit bb728f04cc890f5c756506dc74a2b93bf137bfae
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 29 11:06:45 2024 +0800
Pipe: Added RemainingEventCount/EstimatedRemainingSeconds in configNode
metrics/show pipes response (#12578)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../heartbeat/DataNodeHeartbeatHandler.java | 6 +-
.../response/pipe/task/PipeTableResp.java | 95 ++++++-----
.../pipe/agent/task/PipeConfigNodeTaskAgent.java | 48 ++++--
.../runtime/PipeRuntimeCoordinator.java | 10 +-
.../runtime/heartbeat/PipeHeartbeat.java | 32 +++-
.../runtime/heartbeat/PipeHeartbeatParser.java | 15 +-
.../runtime/heartbeat/PipeHeartbeatScheduler.java | 12 +-
.../extractor/ConfigRegionListeningFilter.java | 2 +-
.../manager/pipe/metric/PipeConfigNodeMetrics.java | 2 +
.../metric/PipeConfigNodeRemainingTimeMetrics.java | 9 ++
.../PipeConfigNodeRemainingTimeOperator.java | 8 +-
.../metric/PipeConfigRegionExtractorMetrics.java | 14 ++
.../pipe/metric/PipeTemporaryMetaMetrics.java | 175 +++++++++++++++++++++
.../confignode/persistence/pipe/PipeInfo.java | 75 ++++++---
.../confignode/persistence/pipe/PipeTaskInfo.java | 5 +-
.../consensus/response/pipe/PipeTableRespTest.java | 4 +-
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 48 ++++--
.../schemaregion/SchemaRegionListeningFilter.java | 2 +-
.../PipeDataNodeRemainingEventAndTimeMetrics.java | 11 ++
.../common/header/ColumnHeaderConstant.java | 6 +-
.../execution/config/sys/pipe/ShowPipeTask.java | 45 +++++-
.../iotdb/commons/pipe/task/meta/PipeMeta.java | 9 +-
.../commons/pipe/task/meta/PipeTemporaryMeta.java | 33 +++-
.../iotdb/commons/service/metric/enums/Metric.java | 2 +
.../src/main/thrift/confignode.thrift | 2 +
.../src/main/thrift/datanode.thrift | 4 +
26 files changed, 555 insertions(+), 119 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index 5829fd07a56..9d42c319a44 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -131,7 +131,11 @@ public class DataNodeHeartbeatHandler implements
AsyncMethodCallback<TDataNodeHe
}
if (heartbeatResp.getPipeMetaList() != null) {
pipeRuntimeCoordinator.parseHeartbeat(
- nodeId, heartbeatResp.getPipeMetaList(),
heartbeatResp.getPipeCompletedList());
+ nodeId,
+ heartbeatResp.getPipeMetaList(),
+ heartbeatResp.getPipeCompletedList(),
+ heartbeatResp.getPipeRemainingEventCountList(),
+ heartbeatResp.getPipeRemainingTimeList());
}
if (heartbeatResp.isSetConfirmedConfigNodeEndPoints()) {
loadManager
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
index 9385f43f9a8..221da8308eb 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
@@ -20,14 +20,18 @@
package org.apache.iotdb.confignode.consensus.response.pipe.task;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTemporaryMeta;
+import
org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningFilter;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
+import org.apache.iotdb.confignode.service.ConfigNode;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.db.utils.DateTimeUtils;
@@ -36,13 +40,14 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import java.util.stream.Collectors;
public class PipeTableResp implements DataSet {
private final TSStatus status;
private final List<PipeMeta> allPipeMeta;
- public PipeTableResp(TSStatus status, List<PipeMeta> allPipeMeta) {
+ public PipeTableResp(final TSStatus status, final List<PipeMeta>
allPipeMeta) {
this.status = status;
this.allPipeMeta = allPipeMeta;
}
@@ -56,46 +61,41 @@ public class PipeTableResp implements DataSet {
if (pipeName == null) {
return this;
} else {
- final List<PipeMeta> filteredPipeMeta = new ArrayList<>();
- for (final PipeMeta pipeMeta : allPipeMeta) {
- if (pipeMeta.getStaticMeta().getPipeName().equals(pipeName)) {
- filteredPipeMeta.add(pipeMeta);
- break;
- }
- }
- return new PipeTableResp(status, filteredPipeMeta);
+ return new PipeTableResp(
+ status,
+ allPipeMeta.stream()
+ .filter(pipeMeta ->
pipeMeta.getStaticMeta().getPipeName().equals(pipeName))
+ .collect(Collectors.toList()));
}
} else {
if (pipeName == null) {
return this;
} else {
- String sortedConnectorParametersString = null;
- for (final PipeMeta pipeMeta : allPipeMeta) {
- if (pipeMeta.getStaticMeta().getPipeName().equals(pipeName)) {
- sortedConnectorParametersString =
- pipeMeta.getStaticMeta().getConnectorParameters().toString();
- break;
- }
- }
+ final String sortedConnectorParametersString =
+ allPipeMeta.stream()
+ .filter(pipeMeta ->
pipeMeta.getStaticMeta().getPipeName().equals(pipeName))
+ .findFirst()
+ .map(pipeMeta ->
pipeMeta.getStaticMeta().getConnectorParameters().toString())
+ .orElse(null);
- final List<PipeMeta> filteredPipeMeta = new ArrayList<>();
- for (final PipeMeta pipeMeta : allPipeMeta) {
- if (pipeMeta
- .getStaticMeta()
- .getConnectorParameters()
- .toString()
- .equals(sortedConnectorParametersString)) {
- filteredPipeMeta.add(pipeMeta);
- }
- }
- return new PipeTableResp(status, filteredPipeMeta);
+ return new PipeTableResp(
+ status,
+ allPipeMeta.stream()
+ .filter(
+ pipeMeta ->
+ pipeMeta
+ .getStaticMeta()
+ .getConnectorParameters()
+ .toString()
+ .equals(sortedConnectorParametersString))
+ .collect(Collectors.toList()));
}
}
}
public TGetAllPipeInfoResp convertToTGetAllPipeInfoResp() throws IOException
{
final List<ByteBuffer> pipeInformationByteBuffers = new ArrayList<>();
- for (PipeMeta pipeMeta : allPipeMeta) {
+ for (final PipeMeta pipeMeta : allPipeMeta) {
pipeInformationByteBuffers.add(pipeMeta.serialize());
}
return new TGetAllPipeInfoResp(status, pipeInformationByteBuffers);
@@ -104,19 +104,21 @@ public class PipeTableResp implements DataSet {
public TShowPipeResp convertToTShowPipeResp() {
final List<TShowPipeInfo> showPipeInfoList = new ArrayList<>();
- for (PipeMeta pipeMeta : allPipeMeta) {
+ for (final PipeMeta pipeMeta : allPipeMeta) {
final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
final StringBuilder exceptionMessageBuilder = new StringBuilder();
- for (PipeRuntimeException e :
runtimeMeta.getNodeId2PipeRuntimeExceptionMap().values()) {
+ for (final PipeRuntimeException e :
+ runtimeMeta.getNodeId2PipeRuntimeExceptionMap().values()) {
exceptionMessageBuilder
.append(DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms"))
.append(", ")
.append(e.getMessage())
.append("\n");
}
- for (PipeTaskMeta pipeTaskMeta :
runtimeMeta.getConsensusGroupId2TaskMetaMap().values()) {
- for (PipeRuntimeException e : pipeTaskMeta.getExceptionMessages()) {
+ for (final PipeTaskMeta pipeTaskMeta :
+ runtimeMeta.getConsensusGroupId2TaskMetaMap().values()) {
+ for (final PipeRuntimeException e :
pipeTaskMeta.getExceptionMessages()) {
exceptionMessageBuilder
.append(DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms"))
.append(", ")
@@ -125,7 +127,7 @@ public class PipeTableResp implements DataSet {
}
}
- showPipeInfoList.add(
+ final TShowPipeInfo showPipeInfo =
new TShowPipeInfo(
staticMeta.getPipeName(),
staticMeta.getCreationTime(),
@@ -133,11 +135,34 @@ public class PipeTableResp implements DataSet {
staticMeta.getExtractorParameters().toString(),
staticMeta.getProcessorParameters().toString(),
staticMeta.getConnectorParameters().toString(),
- exceptionMessageBuilder.toString()));
+ exceptionMessageBuilder.toString());
+ final PipeTemporaryMeta temporaryMeta = pipeMeta.getTemporaryMeta();
+ final boolean canCalculateOnLocal = canCalculateOnLocal(pipeMeta);
+
+ showPipeInfo.setRemainingEventCount(
+ canCalculateOnLocal ? -1 : temporaryMeta.getGlobalRemainingEvents());
+ showPipeInfo.setEstimatedRemainingTime(
+ canCalculateOnLocal ? -1 : temporaryMeta.getGlobalRemainingTime());
+ showPipeInfoList.add(showPipeInfo);
}
// sorted by pipe name
showPipeInfoList.sort(Comparator.comparing(pipeInfo -> pipeInfo.id));
return new
TShowPipeResp().setStatus(status).setPipeInfoList(showPipeInfoList);
}
+
+ private boolean canCalculateOnLocal(final PipeMeta pipeMeta) {
+ try {
+ return ConfigNode.getInstance()
+ .getConfigManager()
+ .getNodeManager()
+ .getRegisteredDataNodeCount()
+ == 1
+ && ConfigRegionListeningFilter.parseListeningPlanTypeSet(
+ pipeMeta.getStaticMeta().getExtractorParameters())
+ .isEmpty();
+ } catch (final IllegalPathException e) {
+ return false;
+ }
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
index 5f9614eceb2..fb8507be9d4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
@@ -30,6 +30,8 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
import
org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningFilter;
+import
org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigNodeRemainingTimeMetrics;
+import
org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigRegionExtractorMetrics;
import org.apache.iotdb.confignode.manager.pipe.task.PipeConfigNodeTask;
import org.apache.iotdb.confignode.manager.pipe.task.PipeConfigNodeTaskBuilder;
import org.apache.iotdb.confignode.manager.pipe.task.PipeConfigNodeTaskStage;
@@ -63,14 +65,16 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
}
@Override
- protected Map<Integer, PipeTask> buildPipeTasks(PipeMeta
pipeMetaFromConfigNode)
+ protected Map<Integer, PipeTask> buildPipeTasks(final PipeMeta
pipeMetaFromConfigNode)
throws IllegalPathException {
return new PipeConfigNodeTaskBuilder(pipeMetaFromConfigNode).build();
}
@Override
protected void createPipeTask(
- int consensusGroupId, PipeStaticMeta pipeStaticMeta, PipeTaskMeta
pipeTaskMeta)
+ final int consensusGroupId,
+ final PipeStaticMeta pipeStaticMeta,
+ final PipeTaskMeta pipeTaskMeta)
throws IllegalPathException {
// Advance the extractor parameters parsing logic to avoid creating
un-relevant pipeTasks
if (consensusGroupId == Integer.MIN_VALUE
@@ -106,12 +110,12 @@ public class PipeConfigNodeTaskAgent extends
PipeTaskAgent {
@Override
protected TPushPipeMetaRespExceptionMessage
handleSinglePipeMetaChangesInternal(
- PipeMeta pipeMetaFromCoordinator) {
+ final PipeMeta pipeMetaFromCoordinator) {
try {
return PipeConfigNodeAgent.runtime().isLeaderReady()
?
super.handleSinglePipeMetaChangesInternal(pipeMetaFromCoordinator.deepCopy())
: null;
- } catch (Exception e) {
+ } catch (final Exception e) {
return new TPushPipeMetaRespExceptionMessage(
pipeMetaFromCoordinator.getStaticMeta().getPipeName(),
e.getMessage(),
@@ -120,7 +124,7 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
}
@Override
- protected TPushPipeMetaRespExceptionMessage handleDropPipeInternal(String
pipeName) {
+ protected TPushPipeMetaRespExceptionMessage handleDropPipeInternal(final
String pipeName) {
return PipeConfigNodeAgent.runtime().isLeaderReady()
? super.handleDropPipeInternal(pipeName)
: null;
@@ -128,7 +132,7 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
@Override
protected List<TPushPipeMetaRespExceptionMessage>
handlePipeMetaChangesInternal(
- List<PipeMeta> pipeMetaListFromCoordinator) {
+ final List<PipeMeta> pipeMetaListFromCoordinator) {
if (isShutdown() || !PipeConfigNodeAgent.runtime().isLeaderReady()) {
return Collections.emptyList();
}
@@ -148,13 +152,13 @@ public class PipeConfigNodeTaskAgent extends
PipeTaskAgent {
.collect(Collectors.toList()));
clearConfigRegionListeningQueueIfNecessary(pipeMetaListFromCoordinator);
return exceptionMessages;
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new PipeException("failed to handle pipe meta changes", e);
}
}
private void clearConfigRegionListeningQueueIfNecessary(
- List<PipeMeta> pipeMetaListFromCoordinator) {
+ final List<PipeMeta> pipeMetaListFromCoordinator) {
final AtomicLong listeningQueueNewFirstIndex = new
AtomicLong(Long.MAX_VALUE);
// Check each pipe
@@ -187,8 +191,8 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
}
@Override
- protected void collectPipeMetaListInternal(TPipeHeartbeatReq req,
TPipeHeartbeatResp resp)
- throws TException {
+ protected void collectPipeMetaListInternal(
+ final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws
TException {
// Do nothing if data node is removing or removed, or request does not
need pipe meta list
if (isShutdown() || !PipeConfigNodeAgent.runtime().isLeaderReady()) {
return;
@@ -197,6 +201,8 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
LOGGER.info("Received pipe heartbeat request {} from config coordinator.",
req.heartbeatId);
final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
+ final List<Long> pipeRemainingEventCountList = new ArrayList<>();
+ final List<Double> pipeRemainingTimeList = new ArrayList<>();
try {
final boolean shouldPrintLog =
System.currentTimeMillis() - lastLogPrintedTime.get() > 1000 * 60 *
10; // 10 minutes
@@ -206,14 +212,32 @@ public class PipeConfigNodeTaskAgent extends
PipeTaskAgent {
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
+
+ final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
+ final long remainingEventCount =
+ PipeConfigRegionExtractorMetrics.getInstance()
+ .getRemainingEventCount(staticMeta.getPipeName(),
staticMeta.getCreationTime());
+ final double remainingTime =
+ PipeConfigNodeRemainingTimeMetrics.getInstance()
+ .getRemainingTime(staticMeta.getPipeName(),
staticMeta.getCreationTime());
+
+ pipeRemainingEventCountList.add(remainingEventCount);
+ pipeRemainingTimeList.add(remainingTime);
+
if (shouldPrintLog) {
- LOGGER.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage());
+ LOGGER.info(
+ "Reporting pipe meta: {}, remainingEventCount: {},
remainingTime: {}",
+ pipeMeta.coreReportMessage(),
+ remainingEventCount,
+ remainingTime);
}
}
LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new TException(e);
}
resp.setPipeMetaList(pipeMetaBinaryList);
+ resp.setPipeRemainingEventCountList(pipeRemainingEventCountList);
+ resp.setPipeRemainingTimeList(pipeRemainingTimeList);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
index 97f893f4eae..d960c3fe88a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
@@ -107,9 +107,15 @@ public class PipeRuntimeCoordinator implements
IClusterStatusSubscriber {
public void parseHeartbeat(
final int dataNodeId,
@NotNull final List<ByteBuffer> pipeMetaByteBufferListFromDataNode,
- /* @Nullable */ final List<Boolean> pipeCompletedListFromAgent) {
+ /* @Nullable */ final List<Boolean> pipeCompletedListFromAgent,
+ /* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
+ /* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
pipeHeartbeatScheduler.parseHeartbeat(
dataNodeId,
- new PipeHeartbeat(pipeMetaByteBufferListFromDataNode,
pipeCompletedListFromAgent));
+ new PipeHeartbeat(
+ pipeMetaByteBufferListFromDataNode,
+ pipeCompletedListFromAgent,
+ pipeRemainingEventCountListFromAgent,
+ pipeRemainingTimeListFromAgent));
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
index 203ba96ed44..4489bf5b957 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
@@ -31,30 +31,54 @@ import java.util.Map;
import java.util.Objects;
public class PipeHeartbeat {
-
private final Map<PipeStaticMeta, PipeMeta> pipeMetaMap = new HashMap<>();
private final Map<PipeStaticMeta, Boolean> isCompletedMap = new HashMap<>();
+ private final Map<PipeStaticMeta, Long> remainingEventCountMap = new
HashMap<>();
+ private final Map<PipeStaticMeta, Double> remainingTimeMap = new HashMap<>();
public PipeHeartbeat(
@NotNull final List<ByteBuffer> pipeMetaByteBufferListFromAgent,
- /* @Nullable */ final List<Boolean> pipeCompletedListFromAgent) {
+ /* @Nullable */ final List<Boolean> pipeCompletedListFromAgent,
+ /* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
+ /* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
for (int i = 0; i < pipeMetaByteBufferListFromAgent.size(); ++i) {
final PipeMeta pipeMeta =
PipeMeta.deserialize(pipeMetaByteBufferListFromAgent.get(i));
pipeMetaMap.put(pipeMeta.getStaticMeta(), pipeMeta);
isCompletedMap.put(
pipeMeta.getStaticMeta(),
Objects.nonNull(pipeCompletedListFromAgent) &&
pipeCompletedListFromAgent.get(i));
+ // If remaining event count & remaining time can not be got, it implies
that the heartbeat is
+ // from an ancient version of DataNode. Here we guarantee that "0" will
not affect both of
+ // the final results and namely these dataNodes are omitted in
calculation.
+ remainingEventCountMap.put(
+ pipeMeta.getStaticMeta(),
+ Objects.nonNull(pipeCompletedListFromAgent)
+ ? pipeRemainingEventCountListFromAgent.get(i)
+ : 0L);
+ remainingTimeMap.put(
+ pipeMeta.getStaticMeta(),
+ Objects.nonNull(pipeRemainingTimeListFromAgent)
+ ? pipeRemainingTimeListFromAgent.get(i)
+ : 0d);
}
}
- public PipeMeta getPipeMeta(PipeStaticMeta pipeStaticMeta) {
+ public PipeMeta getPipeMeta(final PipeStaticMeta pipeStaticMeta) {
return pipeMetaMap.get(pipeStaticMeta);
}
- public Boolean isCompleted(PipeStaticMeta pipeStaticMeta) {
+ public Boolean isCompleted(final PipeStaticMeta pipeStaticMeta) {
return isCompletedMap.get(pipeStaticMeta);
}
+ public Long getRemainingEventCount(final PipeStaticMeta pipeStaticMeta) {
+ return remainingEventCountMap.get(pipeStaticMeta);
+ }
+
+ public Double getRemainingTime(final PipeStaticMeta pipeStaticMeta) {
+ return remainingTimeMap.get(pipeStaticMeta);
+ }
+
public boolean isEmpty() {
return pipeMetaMap.isEmpty();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 68ea140dda9..790799cae72 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeTemporaryMeta;
@@ -130,8 +131,8 @@ public class PipeHeartbeatParser {
final int nodeId,
final PipeHeartbeat pipeHeartbeat) {
for (final PipeMeta pipeMetaFromCoordinator :
pipeTaskInfo.get().getPipeMetaList()) {
- final PipeMeta pipeMetaFromAgent =
- pipeHeartbeat.getPipeMeta(pipeMetaFromCoordinator.getStaticMeta());
+ final PipeStaticMeta staticMeta =
pipeMetaFromCoordinator.getStaticMeta();
+ final PipeMeta pipeMetaFromAgent = pipeHeartbeat.getPipeMeta(staticMeta);
if (pipeMetaFromAgent == null) {
LOGGER.info(
"PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, "
@@ -140,11 +141,11 @@ public class PipeHeartbeatParser {
continue;
}
+ final PipeTemporaryMeta temporaryMeta =
pipeMetaFromCoordinator.getTemporaryMeta();
+
// Remove completed pipes
- final Boolean isPipeCompletedFromAgent =
- pipeHeartbeat.isCompleted(pipeMetaFromCoordinator.getStaticMeta());
+ final Boolean isPipeCompletedFromAgent =
pipeHeartbeat.isCompleted(staticMeta);
if (Boolean.TRUE.equals(isPipeCompletedFromAgent)) {
- final PipeTemporaryMeta temporaryMeta =
pipeMetaFromCoordinator.getTemporaryMeta();
temporaryMeta.markDataNodeCompleted(nodeId);
@@ -159,6 +160,10 @@ public class PipeHeartbeatParser {
}
}
+ // Record statistics
+ temporaryMeta.setRemainingEvent(nodeId,
pipeHeartbeat.getRemainingEventCount(staticMeta));
+ temporaryMeta.setRemainingTime(nodeId,
pipeHeartbeat.getRemainingTime(staticMeta));
+
final Map<Integer, PipeTaskMeta> pipeTaskMetaMapFromCoordinator =
pipeMetaFromCoordinator.getRuntimeMeta().getConsensusGroupId2TaskMetaMap();
final Map<Integer, PipeTaskMeta> pipeTaskMetaMapFromAgent =
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
index 462b6a017a8..ad691ea8565 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
@@ -109,7 +109,11 @@ public class PipeHeartbeatScheduler {
(dataNodeId, resp) ->
pipeHeartbeatParser.parseHeartbeat(
dataNodeId,
- new PipeHeartbeat(resp.getPipeMetaList(),
resp.getPipeCompletedList())));
+ new PipeHeartbeat(
+ resp.getPipeMetaList(),
+ resp.getPipeCompletedList(),
+ resp.getPipeRemainingEventCountList(),
+ resp.getPipeRemainingTimeList())));
// config node heartbeat
try {
@@ -117,7 +121,11 @@ public class PipeHeartbeatScheduler {
PipeConfigNodeAgent.task().collectPipeMetaList(request, configNodeResp);
pipeHeartbeatParser.parseHeartbeat(
ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
- new PipeHeartbeat(configNodeResp.getPipeMetaList(), null));
+ new PipeHeartbeat(
+ configNodeResp.getPipeMetaList(),
+ null,
+ configNodeResp.getPipeRemainingEventCountList(),
+ configNodeResp.getPipeRemainingTimeList()));
} catch (final Exception e) {
LOGGER.warn("Failed to collect pipe meta list from config node task
agent", e);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
index f782f3e0617..0e06bdba1c2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
@@ -159,7 +159,7 @@ public class ConfigRegionListeningFilter {
}
public static Set<ConfigPhysicalPlanType>
parseListeningPlanTypeSet(PipeParameters parameters)
- throws IllegalPathException, IllegalArgumentException {
+ throws IllegalPathException {
Set<ConfigPhysicalPlanType> planTypes = new HashSet<>();
Set<PartialPath> inclusionOptions =
parseOptions(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
index d819ccc075e..b02679f147e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
@@ -41,6 +41,7 @@ public class PipeConfigNodeMetrics implements IMetricSet {
PipeConfigRegionExtractorMetrics.getInstance().bindTo(metricService);
PipeConfigRegionConnectorMetrics.getInstance().bindTo(metricService);
PipeConfigNodeRemainingTimeMetrics.getInstance().bindTo(metricService);
+ PipeTemporaryMetaMetrics.getInstance().bindTo(metricService);
}
@Override
@@ -51,5 +52,6 @@ public class PipeConfigNodeMetrics implements IMetricSet {
PipeConfigRegionExtractorMetrics.getInstance().unbindFrom(metricService);
PipeConfigRegionConnectorMetrics.getInstance().unbindFrom(metricService);
PipeConfigNodeRemainingTimeMetrics.getInstance().unbindFrom(metricService);
+ PipeTemporaryMetaMetrics.getInstance().unbindFrom(metricService);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
index 0ef7b48d1c7..378251eb45c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
@@ -147,6 +147,15 @@ public class PipeConfigNodeRemainingTimeMetrics implements
IMetricSet {
operator.markConfigRegionCommit();
}
+ //////////////////////////// Show pipes ////////////////////////////
+
+ public double getRemainingTime(final String pipeName, final long
creationTime) {
+ return remainingTimeOperatorMap
+ .computeIfAbsent(
+ pipeName + "_" + creationTime, k -> new
PipeConfigNodeRemainingTimeOperator())
+ .getRemainingTime();
+ }
+
//////////////////////////// singleton ////////////////////////////
private static class PipeConfigNodeRemainingTimeMetricsHolder {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
index cbc23f2e9be..201aa775de5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
@@ -56,13 +56,7 @@ class PipeConfigNodeRemainingTimeOperator {
//////////////////////////// Remaining time calculation
////////////////////////////
/**
- * This will calculate the estimated remaining time of pipe.
- *
- * <p>Notes:
- *
- * <p>1. The events in pipe assigner are omitted.
- *
- * <p>2. Other pipes' events sharing the same connectorSubtasks may be
over-calculated.
+ * This will calculate the estimated remaining time of the given pipe's
config region subTask.
*
* @return The estimated remaining time
*/
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigRegionExtractorMetrics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigRegionExtractorMetrics.java
index 879c31999d1..403fb8a2b6c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigRegionExtractorMetrics.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigRegionExtractorMetrics.java
@@ -117,6 +117,20 @@ public class PipeConfigRegionExtractorMetrics implements
IMetricSet {
extractorMap.remove(taskID);
}
+ //////////////////////////// Show pipes ////////////////////////////
+
+ public long getRemainingEventCount(final String pipeName, final long
creationTime) {
+ final String taskID = pipeName + "_" + creationTime;
+ final IoTDBConfigRegionExtractor extractor = extractorMap.get(taskID);
+ if (Objects.isNull(extractor)) {
+ LOGGER.warn(
+ "Failed to get remaining event count, IoTDBConfigRegionExtractor({})
does not exist",
+ taskID);
+ return 0;
+ }
+ return extractor.getUnTransferredEventCount();
+ }
+
//////////////////////////// singleton ////////////////////////////
private static class PipeConfigRegionExtractorMetricsHolder {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaMetrics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaMetrics.java
new file mode 100644
index 00000000000..676b4cd154e
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaMetrics.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.metric;
+
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTemporaryMeta;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The {@link PipeTemporaryMetaMetrics} is to calculate the pipe-statistics
from the {@link
+ * PipeTemporaryMeta}. The class is lock-free and can only read from the
thread-safe variables from
+ * the {@link PipeTemporaryMeta}.
+ */
+public class PipeTemporaryMetaMetrics implements IMetricSet {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTemporaryMetaMetrics.class);
+
+ private volatile AbstractMetricService metricService;
+
+ private final Map<String, PipeTemporaryMeta> pipeTemporaryMetaMap = new
ConcurrentHashMap<>();
+
+ //////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
+
+ @Override
+ public void bindTo(final AbstractMetricService metricService) {
+ this.metricService = metricService;
+
ImmutableSet.copyOf(pipeTemporaryMetaMap.keySet()).forEach(this::createMetrics);
+ }
+
+ private void createMetrics(final String pipeID) {
+ createAutoGauge(pipeID);
+ }
+
+ private void createAutoGauge(final String pipeID) {
+ final PipeTemporaryMeta pipeTemporaryMeta =
pipeTemporaryMetaMap.get(pipeID);
+ final String[] pipeNameAndCreationTime = pipeID.split("_");
+ metricService.createAutoGauge(
+ Metric.PIPE_GLOBAL_REMAINING_EVENT_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ pipeTemporaryMeta,
+ PipeTemporaryMeta::getGlobalRemainingEvents,
+ Tag.NAME.toString(),
+ pipeNameAndCreationTime[0],
+ Tag.CREATION_TIME.toString(),
+ pipeNameAndCreationTime[1]);
+ metricService.createAutoGauge(
+ Metric.PIPE_GLOBAL_REMAINING_TIME.toString(),
+ MetricLevel.IMPORTANT,
+ pipeTemporaryMeta,
+ PipeTemporaryMeta::getGlobalRemainingTime,
+ Tag.NAME.toString(),
+ pipeNameAndCreationTime[0],
+ Tag.CREATION_TIME.toString(),
+ pipeNameAndCreationTime[1]);
+ }
+
+ @Override
+ public void unbindFrom(final AbstractMetricService metricService) {
+
ImmutableSet.copyOf(pipeTemporaryMetaMap.keySet()).forEach(this::deregister);
+ if (!pipeTemporaryMetaMap.isEmpty()) {
+ LOGGER.warn(
+ "Failed to unbind from pipe temporary meta metrics,
PipeTemporaryMeta map not empty");
+ }
+ }
+
+ private void removeMetrics(final String pipeID) {
+ removeAutoGauge(pipeID);
+ }
+
+ private void removeAutoGauge(final String pipeID) {
+ final String[] pipeNameAndCreationTime = pipeID.split("_");
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.PIPE_GLOBAL_REMAINING_EVENT_COUNT.toString(),
+ Tag.NAME.toString(),
+ pipeNameAndCreationTime[0],
+ Tag.CREATION_TIME.toString(),
+ pipeNameAndCreationTime[1]);
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.PIPE_GLOBAL_REMAINING_TIME.toString(),
+ Tag.NAME.toString(),
+ pipeNameAndCreationTime[0],
+ Tag.CREATION_TIME.toString(),
+ pipeNameAndCreationTime[1]);
+ pipeTemporaryMetaMap.remove(pipeID);
+ }
+
+ //////////////////////////// register & deregister (pipe integration)
////////////////////////////
+
+ public void register(final PipeMeta pipeMeta) {
+ final String taskID =
+ pipeMeta.getStaticMeta().getPipeName() + "_" +
pipeMeta.getStaticMeta().getCreationTime();
+ pipeTemporaryMetaMap.putIfAbsent(taskID, pipeMeta.getTemporaryMeta());
+ if (Objects.nonNull(metricService)) {
+ createMetrics(taskID);
+ }
+ }
+
+ public void deregister(final String pipeID) {
+ if (!pipeTemporaryMetaMap.containsKey(pipeID)) {
+ LOGGER.warn(
+ "Failed to deregister pipe temporary meta metrics,
PipeTemporaryMeta({}) does not exist",
+ pipeID);
+ return;
+ }
+ if (Objects.nonNull(metricService)) {
+ removeMetrics(pipeID);
+ }
+ }
+
+ public void handleTemporaryMetaChanges(final Iterable<PipeMeta>
pipeMetaList) {
+ final Set<String> pipeTaskIDs = new HashSet<>();
+ pipeMetaList.forEach(
+ pipeMeta -> {
+ final String pipeTaskID =
+ pipeMeta.getStaticMeta().getPipeName()
+ + "_"
+ + pipeMeta.getStaticMeta().getCreationTime();
+ if (!pipeTemporaryMetaMap.containsKey(pipeTaskID)) {
+ register(pipeMeta);
+ }
+ pipeTaskIDs.add(pipeTaskID);
+ });
+ ImmutableSet.copyOf(pipeTemporaryMetaMap.keySet()).stream()
+ .filter(pipeTaskID -> !pipeTaskIDs.contains(pipeTaskID))
+ .forEach(this::deregister);
+ }
+
+ //////////////////////////// singleton ////////////////////////////
+
+ private static class PipeTemporaryMetaMetricsHolder {
+
+ private static final PipeTemporaryMetaMetrics INSTANCE = new
PipeTemporaryMetaMetrics();
+
+ private PipeTemporaryMetaMetricsHolder() {
+ // Empty constructor
+ }
+ }
+
+ public static PipeTemporaryMetaMetrics getInstance() {
+ return PipeTemporaryMetaMetricsHolder.INSTANCE;
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
index ba3981b67f1..ea1664c27a0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
@@ -30,6 +30,10 @@ import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePla
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
+import
org.apache.iotdb.confignode.manager.pipe.agent.runtime.PipeConfigRegionListener;
+import
org.apache.iotdb.confignode.manager.pipe.agent.task.PipeConfigNodeTaskAgent;
+import
org.apache.iotdb.confignode.manager.pipe.execution.PipeConfigNodeSubtask;
+import
org.apache.iotdb.confignode.manager.pipe.metric.PipeTemporaryMetaMetrics;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -66,7 +70,7 @@ public class PipeInfo implements SnapshotProcessor {
///////////////////////////////// Non-query
/////////////////////////////////
- public TSStatus createPipe(CreatePipePlanV2 plan) {
+ public TSStatus createPipe(final CreatePipePlanV2 plan) {
try {
final Optional<PipeMeta> pipeMetaBeforeCreation =
Optional.ofNullable(
@@ -89,33 +93,37 @@ public class PipeInfo implements SnapshotProcessor {
throw new PipeException("Failed to increase listener
reference", e);
}
});
+ PipeTemporaryMetaMetrics.getInstance()
+ .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} else {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage(message.getMessage());
}
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.error("Failed to create pipe", e);
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage("Failed to create pipe, because " + e.getMessage());
}
}
- public TSStatus setPipeStatus(SetPipeStatusPlanV2 plan) {
+ public TSStatus setPipeStatus(final SetPipeStatusPlanV2 plan) {
try {
pipeTaskInfo.setPipeStatus(plan);
PipeConfigNodeAgent.task()
.handleSinglePipeMetaChanges(pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeName()));
+ PipeTemporaryMetaMetrics.getInstance()
+ .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.error("Failed to set pipe status", e);
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage("Failed to set pipe status, because " + e.getMessage());
}
}
- public TSStatus dropPipe(DropPipePlanV2 plan) {
+ public TSStatus dropPipe(final DropPipePlanV2 plan) {
try {
final Optional<PipeMeta> pipeMetaBeforeDrop =
Optional.ofNullable(pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeName()));
@@ -130,48 +138,63 @@ public class PipeInfo implements SnapshotProcessor {
try {
PipeConfigNodeAgent.runtime()
.decreaseListenerReference(meta.getStaticMeta().getExtractorParameters());
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new PipeException("Failed to decrease listener
reference", e);
}
});
+ PipeTemporaryMetaMetrics.getInstance()
+ .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} else {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage(message.getMessage());
}
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.error("Failed to drop pipe", e);
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage("Failed to drop pipe, because " + e.getMessage());
}
}
- public TSStatus alterPipe(AlterPipePlanV2 plan) {
+ public TSStatus alterPipe(final AlterPipePlanV2 plan) {
try {
pipeTaskInfo.alterPipe(plan);
PipeConfigNodeAgent.task()
.handleSinglePipeMetaChanges(
pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeStaticMeta().getPipeName()));
+ PipeTemporaryMetaMetrics.getInstance()
+ .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.error("Failed to alter pipe", e);
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage("Failed to alter pipe, because " + e.getMessage());
}
}
- public TSStatus operateMultiplePipes(OperateMultiplePipesPlanV2 plans) {
+ /**
+ * Note: This interface is only used for subscription and thus irrelevant to
the {@link
+ * PipeConfigNodeSubtask}. Hence, we can skip the operation of {@link
PipeConfigNodeTaskAgent} and
+ * {@link PipeConfigRegionListener} here.
+ *
+ * @param plans An {@link OperateMultiplePipesPlanV2} consisting of many
subPlans
+ * @return result {@link TSStatus}
+ */
+ public TSStatus operateMultiplePipes(final OperateMultiplePipesPlanV2 plans)
{
try {
- return pipeTaskInfo.operateMultiplePipes(plans);
- } catch (Exception e) {
+ final TSStatus status = pipeTaskInfo.operateMultiplePipes(plans);
+ PipeTemporaryMetaMetrics.getInstance()
+ .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
+ return status;
+ } catch (final Exception e) {
LOGGER.error("Failed to create multiple pipes", e);
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage("Failed to create multiple pipes, because " +
e.getMessage());
}
}
- public TSStatus handleLeaderChange(PipeHandleLeaderChangePlan plan) {
+ public TSStatus handleLeaderChange(final PipeHandleLeaderChangePlan plan) {
try {
pipeTaskInfo.handleLeaderChange(plan);
@@ -180,15 +203,17 @@ public class PipeInfo implements SnapshotProcessor {
pipeMetaListFromCoordinator.add(pipeMeta);
}
PipeConfigNodeAgent.task().handlePipeMetaChanges(pipeMetaListFromCoordinator);
+ PipeTemporaryMetaMetrics.getInstance()
+ .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.error("Failed to handle leader change", e);
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage("Failed to handle leader change, because " +
e.getMessage());
}
}
- public TSStatus handleMetaChanges(PipeHandleMetaChangePlan plan) {
+ public TSStatus handleMetaChanges(final PipeHandleMetaChangePlan plan) {
try {
pipeTaskInfo.handleMetaChanges(plan);
@@ -198,8 +223,10 @@ public class PipeInfo implements SnapshotProcessor {
pipeTaskInfo.getPipeMetaByPipeName(pipeMeta.getStaticMeta().getPipeName()));
}
PipeConfigNodeAgent.task().handlePipeMetaChanges(pipeMetaListFromCoordinator);
+ PipeTemporaryMetaMetrics.getInstance()
+ .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.error("Failed to handle meta changes", e);
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage("Failed to handle meta changes, because " +
e.getMessage());
@@ -209,13 +236,13 @@ public class PipeInfo implements SnapshotProcessor {
///////////////////////////////// SnapshotProcessor
/////////////////////////////////
@Override
- public boolean processTakeSnapshot(File snapshotDir) throws IOException {
+ public boolean processTakeSnapshot(final File snapshotDir) throws
IOException {
return pipeTaskInfo.processTakeSnapshot(snapshotDir)
&& pipePluginInfo.processTakeSnapshot(snapshotDir);
}
@Override
- public void processLoadSnapshot(File snapshotDir) throws IOException {
+ public void processLoadSnapshot(final File snapshotDir) throws IOException {
Exception loadPipeTaskInfoException = null;
Exception loadPipePluginInfoException = null;
@@ -226,14 +253,14 @@ public class PipeInfo implements SnapshotProcessor {
PipeConfigNodeAgent.runtime()
.increaseListenerReference(pipeMeta.getStaticMeta().getExtractorParameters());
}
- } catch (Exception ex) {
+ } catch (final Exception ex) {
LOGGER.error("Failed to load pipe task info from snapshot", ex);
loadPipeTaskInfoException = ex;
}
try {
pipePluginInfo.processLoadSnapshot(snapshotDir);
- } catch (Exception ex) {
+ } catch (final Exception ex) {
LOGGER.error("Failed to load pipe plugin info from snapshot", ex);
loadPipePluginInfoException = ex;
}
@@ -251,16 +278,16 @@ public class PipeInfo implements SnapshotProcessor {
///////////////////////////////// equals & hashCode
/////////////////////////////////
@Override
- public boolean equals(Object o) {
+ public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
- PipeInfo pipeInfo = (PipeInfo) o;
- return Objects.equals(pipePluginInfo, pipeInfo.pipePluginInfo)
- && Objects.equals(pipeTaskInfo, pipeInfo.pipeTaskInfo);
+ final PipeInfo that = (PipeInfo) o;
+ return Objects.equals(pipePluginInfo, that.pipePluginInfo)
+ && Objects.equals(pipeTaskInfo, that.pipeTaskInfo);
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 201f2f049ed..037b3b4fb95 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTemporaryMeta;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
@@ -402,10 +403,12 @@ public class PipeTaskInfo implements SnapshotProcessor {
public TSStatus alterPipe(final AlterPipePlanV2 plan) {
acquireWriteLock();
try {
+ final PipeTemporaryMeta temporaryMeta =
+
pipeMetaKeeper.getPipeMeta(plan.getPipeStaticMeta().getPipeName()).getTemporaryMeta();
pipeMetaKeeper.removePipeMeta(plan.getPipeStaticMeta().getPipeName());
pipeMetaKeeper.addPipeMeta(
plan.getPipeStaticMeta().getPipeName(),
- new PipeMeta(plan.getPipeStaticMeta(), plan.getPipeRuntimeMeta()));
+ new PipeMeta(plan.getPipeStaticMeta(), plan.getPipeRuntimeMeta(),
temporaryMeta));
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} finally {
releaseWriteLock();
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
index d974ceeab0a..144b988df1e 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
@@ -79,7 +79,7 @@ public class PipeTableRespTest {
pipeTasks1.put(1, pipeTaskMeta1);
PipeStaticMeta pipeStaticMeta1 =
new PipeStaticMeta(
- "testPipe", 121, extractorAttributes1, processorAttributes1,
connectorAttributes1);
+ "testPipe1", 122, extractorAttributes1, processorAttributes1,
connectorAttributes1);
PipeRuntimeMeta pipeRuntimeMeta1 = new PipeRuntimeMeta(pipeTasks1);
pipeMetaList.add(new PipeMeta(pipeStaticMeta1, pipeRuntimeMeta1));
@@ -99,7 +99,7 @@ public class PipeTableRespTest {
pipeTasks2.put(1, pipeTaskMeta2);
PipeStaticMeta pipeStaticMeta2 =
new PipeStaticMeta(
- "testPipe", 121, extractorAttributes2, processorAttributes2,
connectorAttributes2);
+ "testPipe2", 123, extractorAttributes2, processorAttributes2,
connectorAttributes2);
PipeRuntimeMeta pipeRuntimeMeta2 = new PipeRuntimeMeta(pipeTasks2);
pipeMetaList.add(new PipeMeta(pipeStaticMeta2, pipeRuntimeMeta2));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 65c7699f398..9f3b7fe66ee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -63,6 +63,7 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.thrift.TException;
+import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -287,6 +288,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
final List<Boolean> pipeCompletedList = new ArrayList<>();
+ final List<Long> pipeRemainingEventCountList = new ArrayList<>();
+ final List<Double> pipeRemainingTimeList = new ArrayList<>();
try {
final Optional<Logger> logger =
PipeResourceManager.log()
@@ -298,8 +301,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
- final Map<Integer, PipeTask> pipeTaskMap =
- pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta());
+ final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
+
+ final Map<Integer, PipeTask> pipeTaskMap =
pipeTaskManager.getPipeTasks(staticMeta);
final boolean isAllDataRegionCompleted =
pipeTaskMap == null
|| pipeTaskMap.entrySet().stream()
@@ -320,14 +324,22 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY),
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE);
- pipeCompletedList.add(isAllDataRegionCompleted &&
includeDataAndNeedDrop);
+ final boolean isCompleted = isAllDataRegionCompleted &&
includeDataAndNeedDrop;
+ final Pair<Long, Double> remainingEventAndTime =
+ PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ .getRemainingEventAndTime(staticMeta.getPipeName(),
staticMeta.getCreationTime());
+ pipeCompletedList.add(isCompleted);
+ pipeRemainingEventCountList.add(remainingEventAndTime.getLeft());
+ pipeRemainingTimeList.add(remainingEventAndTime.getRight());
logger.ifPresent(
l ->
l.info(
- "Reporting pipe meta: {}, isCompleted: {}",
+ "Reporting pipe meta: {}, isCompleted: {},
remainingEventCount: {}, estimatedRemainingTime: {}",
pipeMeta.coreReportMessage(),
- includeDataAndNeedDrop));
+ isCompleted,
+ remainingEventAndTime.getLeft(),
+ remainingEventAndTime.getRight()));
}
LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
} catch (final IOException | IllegalPathException e) {
@@ -335,6 +347,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
resp.setPipeMetaList(pipeMetaBinaryList);
resp.setPipeCompletedList(pipeCompletedList);
+ resp.setPipeRemainingEventCountList(pipeRemainingEventCountList);
+ resp.setPipeRemainingTimeList(pipeRemainingTimeList);
+ PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true);
}
@Override
@@ -353,6 +368,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
final List<Boolean> pipeCompletedList = new ArrayList<>();
+ final List<Long> pipeRemainingEventCountList = new ArrayList<>();
+ final List<Double> pipeRemainingTimeList = new ArrayList<>();
try {
final Optional<Logger> logger =
PipeResourceManager.log()
@@ -364,8 +381,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
- final Map<Integer, PipeTask> pipeTaskMap =
- pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta());
+ final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
+
+ final Map<Integer, PipeTask> pipeTaskMap =
pipeTaskManager.getPipeTasks(staticMeta);
final boolean isAllDataRegionCompleted =
pipeTaskMap == null
|| pipeTaskMap.entrySet().stream()
@@ -386,14 +404,22 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY),
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE);
- pipeCompletedList.add(isAllDataRegionCompleted &&
includeDataAndNeedDrop);
+ final boolean isCompleted = isAllDataRegionCompleted &&
includeDataAndNeedDrop;
+ final Pair<Long, Double> remainingEventAndTime =
+ PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ .getRemainingEventAndTime(staticMeta.getPipeName(),
staticMeta.getCreationTime());
+ pipeCompletedList.add(isCompleted);
+ pipeRemainingEventCountList.add(remainingEventAndTime.getLeft());
+ pipeRemainingTimeList.add(remainingEventAndTime.getRight());
logger.ifPresent(
l ->
l.info(
- "Reporting pipe meta: {}, isCompleted: {}",
+ "Reporting pipe meta: {}, isCompleted: {},
remainingEventCount: {}, estimatedRemainingTime: {}",
pipeMeta.coreReportMessage(),
- includeDataAndNeedDrop));
+ isCompleted,
+ remainingEventAndTime.getLeft(),
+ remainingEventAndTime.getRight()));
}
LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
} catch (final IOException | IllegalPathException e) {
@@ -401,6 +427,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
resp.setPipeMetaList(pipeMetaBinaryList);
resp.setPipeCompletedList(pipeCompletedList);
+ resp.setPipeRemainingEventCountList(pipeRemainingEventCountList);
+ resp.setPipeRemainingTimeList(pipeRemainingTimeList);
PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/SchemaRegionListeningFilter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/SchemaRegionListeningFilter.java
index 63c42cc98c2..4b6233b9aaf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/SchemaRegionListeningFilter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/SchemaRegionListeningFilter.java
@@ -97,7 +97,7 @@ public class SchemaRegionListeningFilter {
}
public static Set<PlanNodeType> parseListeningPlanTypeSet(PipeParameters
parameters)
- throws IllegalPathException, IllegalArgumentException {
+ throws IllegalPathException {
Set<PlanNodeType> planTypes = new HashSet<>();
Set<PartialPath> inclusionOptions =
parseOptions(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
index 5173d64e04f..c1ecb8460b4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;
import com.google.common.collect.ImmutableSet;
+import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -197,6 +198,16 @@ public class PipeDataNodeRemainingEventAndTimeMetrics
implements IMetricSet {
}
}
+ //////////////////////////// Show pipes ////////////////////////////
+
+ public Pair<Long, Double> getRemainingEventAndTime(
+ final String pipeName, final long creationTime) {
+ final PipeDataNodeRemainingEventAndTimeOperator operator =
+ remainingEventAndTimeOperatorMap.computeIfAbsent(
+ pipeName + "_" + creationTime, k -> new
PipeDataNodeRemainingEventAndTimeOperator());
+ return new Pair<>(operator.getRemainingEvents(),
operator.getRemainingTime());
+ }
+
//////////////////////////// singleton ////////////////////////////
private static class PipeDataNodeRemainingEventAndTimeMetricsHolder {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java
index cb76c13aa0b..78e8a32630d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java
@@ -168,6 +168,8 @@ public class ColumnHeaderConstant {
public static final String PIPE_PROCESSOR = "PipeProcessor";
public static final String PIPE_CONNECTOR = "PipeSink";
public static final String EXCEPTION_MESSAGE = "ExceptionMessage";
+ public static final String REMAINING_EVENT_COUNT = "RemainingEventCount";
+ public static final String ESTIMATED_REMAINING_SECONDS =
"EstimatedRemainingSeconds";
// column names for select into
public static final String SOURCE_DEVICE = "SourceDevice";
@@ -414,7 +416,9 @@ public class ColumnHeaderConstant {
new ColumnHeader(PIPE_EXTRACTOR, TSDataType.TEXT),
new ColumnHeader(PIPE_PROCESSOR, TSDataType.TEXT),
new ColumnHeader(PIPE_CONNECTOR, TSDataType.TEXT),
- new ColumnHeader(EXCEPTION_MESSAGE, TSDataType.TEXT));
+ new ColumnHeader(EXCEPTION_MESSAGE, TSDataType.TEXT),
+ new ColumnHeader(REMAINING_EVENT_COUNT, TSDataType.TEXT),
+ new ColumnHeader(ESTIMATED_REMAINING_SECONDS, TSDataType.TEXT));
public static final List<ColumnHeader> showTopicColumnHeaders =
ImmutableList.of(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
index 55e2e284e05..a4d0a9505b4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
@@ -37,6 +38,7 @@ import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlockBuilder;
import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
import java.util.List;
import java.util.stream.Collectors;
@@ -45,24 +47,24 @@ public class ShowPipeTask implements IConfigTask {
private final ShowPipesStatement showPipesStatement;
- public ShowPipeTask(ShowPipesStatement showPipesStatement) {
+ public ShowPipeTask(final ShowPipesStatement showPipesStatement) {
this.showPipesStatement = showPipesStatement;
}
@Override
- public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
+ public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor
configTaskExecutor)
throws InterruptedException {
return configTaskExecutor.showPipes(showPipesStatement);
}
public static void buildTSBlock(
- List<TShowPipeInfo> pipeInfoList, SettableFuture<ConfigTaskResult>
future) {
- List<TSDataType> outputDataTypes =
+ final List<TShowPipeInfo> pipeInfoList, final
SettableFuture<ConfigTaskResult> future) {
+ final List<TSDataType> outputDataTypes =
ColumnHeaderConstant.showPipeColumnHeaders.stream()
.map(ColumnHeader::getColumnType)
.collect(Collectors.toList());
- TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
- for (TShowPipeInfo tPipeInfo : pipeInfoList) {
+ final TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+ for (final TShowPipeInfo tPipeInfo : pipeInfoList) {
builder.getTimeColumnBuilder().writeLong(0L);
builder
.getColumnBuilder(0)
@@ -88,9 +90,38 @@ public class ShowPipeTask implements IConfigTask {
builder
.getColumnBuilder(6)
.writeBinary(new Binary(tPipeInfo.getExceptionMessage(),
TSFileConfig.STRING_CHARSET));
+
+ // Optional, default 0/0.0
+ long remainingEventCount = tPipeInfo.getRemainingEventCount();
+ double remainingTime = tPipeInfo.getEstimatedRemainingTime();
+
+ if (remainingEventCount == -1 && remainingTime == -1) {
+ final Pair<Long, Double> remainingEventAndTime =
+ PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+ .getRemainingEventAndTime(tPipeInfo.getId(),
tPipeInfo.getCreationTime());
+ remainingEventCount = remainingEventAndTime.getLeft();
+ remainingTime = remainingEventAndTime.getRight();
+ }
+
+ builder
+ .getColumnBuilder(7)
+ .writeBinary(
+ new Binary(
+ tPipeInfo.isSetRemainingEventCount()
+ ? String.valueOf(remainingEventCount)
+ : "Unknown",
+ TSFileConfig.STRING_CHARSET));
+ builder
+ .getColumnBuilder(8)
+ .writeBinary(
+ new Binary(
+ tPipeInfo.isSetEstimatedRemainingTime()
+ ? String.format("%.2f", remainingTime)
+ : "Unknown",
+ TSFileConfig.STRING_CHARSET));
builder.declarePosition();
}
- DatasetHeader datasetHeader = DatasetHeaderFactory.getShowPipeHeader();
+ final DatasetHeader datasetHeader =
DatasetHeaderFactory.getShowPipeHeader();
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS,
builder.build(), datasetHeader));
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
index e30fcf11269..2cdb6659a9a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
@@ -37,9 +37,16 @@ public class PipeMeta {
private final PipeTemporaryMeta temporaryMeta;
public PipeMeta(final PipeStaticMeta staticMeta, final PipeRuntimeMeta
runtimeMeta) {
+ this(staticMeta, runtimeMeta, new PipeTemporaryMeta());
+ }
+
+ public PipeMeta(
+ final PipeStaticMeta staticMeta,
+ final PipeRuntimeMeta runtimeMeta,
+ final PipeTemporaryMeta temporaryMeta) {
this.staticMeta = staticMeta;
this.runtimeMeta = runtimeMeta;
- this.temporaryMeta = new PipeTemporaryMeta();
+ this.temporaryMeta = temporaryMeta;
}
public PipeStaticMeta getStaticMeta() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
index 6da2be8e81f..14ef390e12b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
@@ -27,15 +27,33 @@ import java.util.concurrent.ConcurrentMap;
public class PipeTemporaryMeta {
private final ConcurrentMap<Integer, Integer> completedDataNodeIds = new
ConcurrentHashMap<>();
+ private final ConcurrentMap<Integer, Long> nodeId2RemainingEventMap = new
ConcurrentHashMap<>();
+ private final ConcurrentMap<Integer, Double> nodeId2RemainingTimeMap = new
ConcurrentHashMap<>();
public void markDataNodeCompleted(final int dataNodeId) {
completedDataNodeIds.put(dataNodeId, dataNodeId);
}
+ public void setRemainingEvent(final int dataNodeId, final long
remainingEventCount) {
+ nodeId2RemainingEventMap.put(dataNodeId, remainingEventCount);
+ }
+
+ public void setRemainingTime(final int dataNodeId, final double
remainingTime) {
+ nodeId2RemainingTimeMap.put(dataNodeId, remainingTime);
+ }
+
public Set<Integer> getCompletedDataNodeIds() {
return completedDataNodeIds.keySet();
}
+ public long getGlobalRemainingEvents() {
+ return
nodeId2RemainingEventMap.values().stream().reduce(Long::sum).orElse(0L);
+ }
+
+ public double getGlobalRemainingTime() {
+ return
nodeId2RemainingTimeMap.values().stream().reduce(Math::max).orElse(0d);
+ }
+
@Override
public boolean equals(final Object o) {
if (this == o) {
@@ -45,16 +63,25 @@ public class PipeTemporaryMeta {
return false;
}
final PipeTemporaryMeta that = (PipeTemporaryMeta) o;
- return Objects.equals(this.completedDataNodeIds,
that.completedDataNodeIds);
+ return Objects.equals(this.completedDataNodeIds, that.completedDataNodeIds)
+ && Objects.equals(this.nodeId2RemainingEventMap,
that.nodeId2RemainingEventMap)
+ && Objects.equals(this.nodeId2RemainingTimeMap,
that.nodeId2RemainingTimeMap);
}
@Override
public int hashCode() {
- return Objects.hash(completedDataNodeIds);
+ return Objects.hash(completedDataNodeIds, nodeId2RemainingEventMap,
nodeId2RemainingTimeMap);
}
@Override
public String toString() {
- return "PipeTemporaryMeta{" + "completedDataNodeIds=" +
completedDataNodeIds + '}';
+ return "PipeTemporaryMeta{"
+ + "completedDataNodeIds="
+ + completedDataNodeIds
+ + ", nodeId2RemainingEventMap="
+ + nodeId2RemainingEventMap
+ + ", nodeId2RemainingTimeMap"
+ + nodeId2RemainingTimeMap
+ + '}';
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 4a8c0fe691d..49f1e057625 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -163,6 +163,8 @@ public enum Metric {
UNTRANSFERRED_CONFIG_COUNT("untransferred_config_count"),
PIPE_CONNECTOR_CONFIG_TRANSFER("pipe_connector_config_transfer"),
PIPE_CONFIGNODE_REMAINING_TIME("pipe_confignode_remaining_time"),
+ PIPE_GLOBAL_REMAINING_EVENT_COUNT("pipe_global_remaining_event_count"),
+ PIPE_GLOBAL_REMAINING_TIME("pipe_global_remaining_time"),
// subscription related
SUBSCRIPTION_UNCOMMITTED_EVENT_COUNT("subscription_uncommitted_event_count"),
SUBSCRIPTION_CURRENT_COMMIT_ID("subscription_current_commit_id"),
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 28978144ae8..94fcb820e09 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -686,6 +686,8 @@ struct TShowPipeInfo {
5: required string pipeProcessor
6: required string pipeConnector
7: required string exceptionMessage
+ 8: optional i64 remainingEventCount
+ 9: optional double EstimatedRemainingTime
}
struct TGetAllPipeInfoResp {
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 06d0fd8d0e7..68666d16261 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -295,6 +295,8 @@ struct TDataNodeHeartbeatResp {
12: optional set<common.TEndPoint> confirmedConfigNodeEndPoints
13: optional map<common.TConsensusGroupId, i64> consensusLogicalTimeMap
14: optional list<bool> pipeCompletedList
+ 15: optional list<i64> pipeRemainingEventCountList
+ 16: optional list<double> pipeRemainingTimeList
}
struct TPipeHeartbeatReq {
@@ -304,6 +306,8 @@ struct TPipeHeartbeatReq {
struct TPipeHeartbeatResp {
1: required list<binary> pipeMetaList
2: optional list<bool> pipeCompletedList
+ 3: optional list<i64> pipeRemainingEventCountList
+ 4: optional list<double> pipeRemainingTimeList
}
enum TSchemaLimitLevel{