This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 48028a15f3f Pipe: Adjusted some loggers of metrics to avoid
unnecessary warns & Include "lastEvent" into pipe's event count metrics &
Removed the "userConflict" judgment to data sync failure caused by
METADATA_ERROR (#12758)
48028a15f3f is described below
commit 48028a15f3f341d0ed5320693b43560fa414382f
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 20 00:10:05 2024 +0800
Pipe: Adjusted some loggers of metrics to avoid unnecessary warns & Include
"lastEvent" into pipe's event count metrics & Removed the "userConflict"
judgment to data sync failure caused by METADATA_ERROR (#12758)
---
.../metric/PipeConfigNodeRemainingTimeMetrics.java | 10 ++---
.../metric/PipeConfigRegionConnectorMetrics.java | 2 +-
.../PipeDataNodeRemainingEventAndTimeMetrics.java | 12 ++++++
.../PipeDataNodeRemainingEventAndTimeOperator.java | 16 ++++++++
.../metric/PipeDataRegionConnectorMetrics.java | 4 +-
.../metric/PipeDataRegionExtractorMetrics.java | 8 ++--
.../iotdb/db/pipe/metric/PipeProcessorMetrics.java | 48 +++++++++++-----------
.../metric/PipeSchemaRegionConnectorMetrics.java | 2 +-
.../visitor/PipeStatementTSStatusVisitor.java | 12 ++----
.../subtask/connector/PipeConnectorSubtask.java | 18 +++++---
.../subtask/processor/PipeProcessorSubtask.java | 11 +++++
.../processor/PipeProcessorSubtaskWorker.java | 21 +++++-----
.../queue/ConcurrentIterableLinkedQueue.java | 27 ++++++------
.../commons/pipe/task/meta/PipeTemporaryMeta.java | 8 ++--
14 files changed, 120 insertions(+), 79 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
index 557e355596e..bf974744b21 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
@@ -111,9 +111,7 @@ public class PipeConfigNodeRemainingTimeMetrics implements
IMetricSet {
public void thawRate(final String pipeID) {
if (!remainingTimeOperatorMap.containsKey(pipeID)) {
- LOGGER.warn(
- "Failed to thaw pipe remaining time rate, RemainingTimeOperator({})
does not exist",
- pipeID);
+ // The configNode may have no pipe task after "startPipe"
return;
}
remainingTimeOperatorMap.get(pipeID).thawRate(true);
@@ -121,9 +119,7 @@ public class PipeConfigNodeRemainingTimeMetrics implements
IMetricSet {
public void freezeRate(final String pipeID) {
if (!remainingTimeOperatorMap.containsKey(pipeID)) {
- LOGGER.warn(
- "Failed to freeze pipe remaining time rate,
RemainingTimeOperator({}) does not exist",
- pipeID);
+ // The configNode may have no pipe task after "stopPipe"
return;
}
remainingTimeOperatorMap.get(pipeID).freezeRate(true);
@@ -147,7 +143,7 @@ public class PipeConfigNodeRemainingTimeMetrics implements
IMetricSet {
}
final PipeConfigNodeRemainingTimeOperator operator =
remainingTimeOperatorMap.get(pipeID);
if (Objects.isNull(operator)) {
- LOGGER.warn(
+ LOGGER.info(
"Failed to mark pipe region commit, RemainingTimeOperator({}) does
not exist", pipeID);
return;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigRegionConnectorMetrics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigRegionConnectorMetrics.java
index 01d9b4bb4d3..0900d9d4628 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigRegionConnectorMetrics.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigRegionConnectorMetrics.java
@@ -127,7 +127,7 @@ public class PipeConfigRegionConnectorMetrics implements
IMetricSet {
}
final Rate rate = configRateMap.get(taskID);
if (rate == null) {
- LOGGER.warn(
+ LOGGER.info(
"Failed to mark pipe config region write plan event,
PipeConfigNodeSubtask({}) does not exist",
taskID);
return;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
index 8f9008f5d9a..cff505d444e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -123,6 +124,17 @@ public class PipeDataNodeRemainingEventAndTimeMetrics
implements IMetricSet {
}
}
+ public void register(final PipeProcessorSubtask processorSubtask) {
+ // The metric is global thus the regionId is omitted
+ final String pipeID = processorSubtask.getPipeName() + "_" +
processorSubtask.getCreationTime();
+ remainingEventAndTimeOperatorMap
+ .computeIfAbsent(pipeID, k -> new
PipeDataNodeRemainingEventAndTimeOperator())
+ .register(processorSubtask);
+ if (Objects.nonNull(metricService)) {
+ createMetrics(pipeID);
+ }
+ }
+
public void register(
final PipeConnectorSubtask connectorSubtask, final String pipeName,
final long creationTime) {
// The metric is global thus the regionId is omitted
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
index dcab93d535c..8963c7019c2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.commons.pipe.metric.PipeRemainingOperator;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
+import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask;
import org.apache.iotdb.pipe.api.event.Event;
import com.codahale.metrics.Clock;
@@ -40,6 +41,8 @@ import java.util.concurrent.atomic.AtomicReference;
class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator {
private final Set<IoTDBDataRegionExtractor> dataRegionExtractors =
Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private final Set<PipeProcessorSubtask> dataRegionProcessors =
+ Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<PipeConnectorSubtask> dataRegionConnectors =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<IoTDBSchemaRegionExtractor> schemaRegionExtractors =
@@ -57,6 +60,10 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
.map(IoTDBDataRegionExtractor::getEventCount)
.reduce(Integer::sum)
.orElse(0)
+ + dataRegionProcessors.stream()
+ .map(processorSubtask -> processorSubtask.getEventCount(false))
+ .reduce(Integer::sum)
+ .orElse(0)
+ dataRegionConnectors.stream()
.map(connectorSubtask -> connectorSubtask.getEventCount(pipeName))
.reduce(Integer::sum)
@@ -84,6 +91,10 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
.map(IoTDBDataRegionExtractor::getEventCount)
.reduce(Integer::sum)
.orElse(0)
+ + dataRegionProcessors.stream()
+ .map(processorSubtask -> processorSubtask.getEventCount(true))
+ .reduce(Integer::sum)
+ .orElse(0)
+ dataRegionConnectors.stream()
.map(connectorSubtask ->
connectorSubtask.getEventCount(pipeName))
.reduce(Integer::sum)
@@ -156,6 +167,11 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
dataRegionExtractors.add(extractor);
}
+ void register(final PipeProcessorSubtask processorSubtask) {
+ setNameAndCreationTime(processorSubtask.getPipeName(),
processorSubtask.getCreationTime());
+ dataRegionProcessors.add(processorSubtask);
+ }
+
void register(
final PipeConnectorSubtask connectorSubtask, final String pipeName,
final long creationTime) {
setNameAndCreationTime(pipeName, creationTime);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionConnectorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionConnectorMetrics.java
index ef41dd993f7..f467cac20bd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionConnectorMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionConnectorMetrics.java
@@ -280,7 +280,7 @@ public class PipeDataRegionConnectorMetrics implements
IMetricSet {
}
final Rate rate = tabletRateMap.get(taskID);
if (rate == null) {
- LOGGER.warn(
+ LOGGER.info(
"Failed to mark pipe data region connector tablet event,
PipeConnectorSubtask({}) does not exist",
taskID);
return;
@@ -294,7 +294,7 @@ public class PipeDataRegionConnectorMetrics implements
IMetricSet {
}
final Rate rate = tsFileRateMap.get(taskID);
if (rate == null) {
- LOGGER.warn(
+ LOGGER.info(
"Failed to mark pipe data region connector tsfile event,
PipeConnectorSubtask({}) does not exist",
taskID);
return;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionExtractorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionExtractorMetrics.java
index 6382297d71f..d18b4de51e4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionExtractorMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionExtractorMetrics.java
@@ -316,7 +316,7 @@ public class PipeDataRegionExtractorMetrics implements
IMetricSet {
}
final Rate rate = tabletRateMap.get(taskID);
if (rate == null) {
- LOGGER.warn(
+ LOGGER.info(
"Failed to mark pipe data region extractor tablet event,
IoTDBDataRegionExtractor({}) does not exist",
taskID);
return;
@@ -330,7 +330,7 @@ public class PipeDataRegionExtractorMetrics implements
IMetricSet {
}
final Rate rate = tsFileRateMap.get(taskID);
if (rate == null) {
- LOGGER.warn(
+ LOGGER.info(
"Failed to mark pipe data region extractor tsfile event,
IoTDBDataRegionExtractor({}) does not exist",
taskID);
return;
@@ -344,7 +344,7 @@ public class PipeDataRegionExtractorMetrics implements
IMetricSet {
}
final Rate rate = pipeHeartbeatRateMap.get(taskID);
if (rate == null) {
- LOGGER.warn(
+ LOGGER.info(
"Failed to mark pipe data region extractor heartbeat event,
IoTDBDataRegionExtractor({}) does not exist",
taskID);
return;
@@ -359,7 +359,7 @@ public class PipeDataRegionExtractorMetrics implements
IMetricSet {
}
final Gauge gauge = recentProcessedTsFileEpochStateMap.get(taskID);
if (gauge == null) {
- LOGGER.warn(
+ LOGGER.info(
"Failed to set recent processed tsfile epoch state,
PipeRealtimeDataRegionExtractor({}) does not exist",
taskID);
return;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
index 8cc2383c847..7f6697430bd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
@@ -55,20 +55,20 @@ public class PipeProcessorMetrics implements IMetricSet {
//////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
@Override
- public void bindTo(AbstractMetricService metricService) {
+ public void bindTo(final AbstractMetricService metricService) {
this.metricService = metricService;
- ImmutableSet<String> taskIDs = ImmutableSet.copyOf(processorMap.keySet());
- for (String taskID : taskIDs) {
+ final ImmutableSet<String> taskIDs =
ImmutableSet.copyOf(processorMap.keySet());
+ for (final String taskID : taskIDs) {
createMetrics(taskID);
}
}
- private void createMetrics(String taskID) {
+ private void createMetrics(final String taskID) {
createRate(taskID);
}
- private void createRate(String taskID) {
- PipeProcessorSubtask processor = processorMap.get(taskID);
+ private void createRate(final String taskID) {
+ final PipeProcessorSubtask processor = processorMap.get(taskID);
// process event rate
tabletRateMap.put(
taskID,
@@ -106,9 +106,9 @@ public class PipeProcessorMetrics implements IMetricSet {
}
@Override
- public void unbindFrom(AbstractMetricService metricService) {
- ImmutableSet<String> taskIDs = ImmutableSet.copyOf(processorMap.keySet());
- for (String taskID : taskIDs) {
+ public void unbindFrom(final AbstractMetricService metricService) {
+ final ImmutableSet<String> taskIDs =
ImmutableSet.copyOf(processorMap.keySet());
+ for (final String taskID : taskIDs) {
deregister(taskID);
}
if (!processorMap.isEmpty()) {
@@ -116,12 +116,12 @@ public class PipeProcessorMetrics implements IMetricSet {
}
}
- private void removeMetrics(String taskID) {
+ private void removeMetrics(final String taskID) {
removeAutoGauge(taskID);
removeRate(taskID);
}
- private void removeAutoGauge(String taskID) {
+ private void removeAutoGauge(final String taskID) {
PipeProcessorSubtask processor = processorMap.get(taskID);
// pending event count
metricService.remove(
@@ -153,7 +153,7 @@ public class PipeProcessorMetrics implements IMetricSet {
String.valueOf(processor.getCreationTime()));
}
- private void removeRate(String taskID) {
+ private void removeRate(final String taskID) {
PipeProcessorSubtask processor = processorMap.get(taskID);
// process event rate
metricService.remove(
@@ -190,15 +190,15 @@ public class PipeProcessorMetrics implements IMetricSet {
//////////////////////////// register & deregister (pipe integration)
////////////////////////////
- public void register(@NonNull PipeProcessorSubtask pipeProcessorSubtask) {
- String taskID = pipeProcessorSubtask.getTaskID();
+ public void register(@NonNull final PipeProcessorSubtask
pipeProcessorSubtask) {
+ final String taskID = pipeProcessorSubtask.getTaskID();
processorMap.putIfAbsent(taskID, pipeProcessorSubtask);
if (Objects.nonNull(metricService)) {
createMetrics(taskID);
}
}
- public void deregister(String taskID) {
+ public void deregister(final String taskID) {
if (!processorMap.containsKey(taskID)) {
LOGGER.warn(
"Failed to deregister pipe processor metrics,
PipeProcessorSubtask({}) does not exist",
@@ -211,13 +211,13 @@ public class PipeProcessorMetrics implements IMetricSet {
processorMap.remove(taskID);
}
- public void markTabletEvent(String taskID) {
+ public void markTabletEvent(final String taskID) {
if (Objects.isNull(metricService)) {
return;
}
- Rate rate = tabletRateMap.get(taskID);
+ final Rate rate = tabletRateMap.get(taskID);
if (rate == null) {
- LOGGER.warn(
+ LOGGER.info(
"Failed to mark pipe processor tablet event,
PipeProcessorSubtask({}) does not exist",
taskID);
return;
@@ -225,13 +225,13 @@ public class PipeProcessorMetrics implements IMetricSet {
rate.mark();
}
- public void markTsFileEvent(String taskID) {
+ public void markTsFileEvent(final String taskID) {
if (Objects.isNull(metricService)) {
return;
}
- Rate rate = tsFileRateMap.get(taskID);
+ final Rate rate = tsFileRateMap.get(taskID);
if (rate == null) {
- LOGGER.warn(
+ LOGGER.info(
"Failed to mark pipe processor tsfile event,
PipeProcessorSubtask({}) does not exist",
taskID);
return;
@@ -239,13 +239,13 @@ public class PipeProcessorMetrics implements IMetricSet {
rate.mark();
}
- public void markPipeHeartbeatEvent(String taskID) {
+ public void markPipeHeartbeatEvent(final String taskID) {
if (Objects.isNull(metricService)) {
return;
}
- Rate rate = pipeHeartbeatRateMap.get(taskID);
+ final Rate rate = pipeHeartbeatRateMap.get(taskID);
if (rate == null) {
- LOGGER.warn(
+ LOGGER.info(
"Failed to mark pipe processor heartbeat event,
PipeProcessorSubtask({}) does not exist",
taskID);
return;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeSchemaRegionConnectorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeSchemaRegionConnectorMetrics.java
index 52553c4ebb2..e3062c9bd78 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeSchemaRegionConnectorMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeSchemaRegionConnectorMetrics.java
@@ -129,7 +129,7 @@ public class PipeSchemaRegionConnectorMetrics implements
IMetricSet {
}
final Rate rate = schemaRateMap.get(taskID);
if (rate == null) {
- LOGGER.warn(
+ LOGGER.info(
"Failed to mark pipe schema region write plan event,
PipeConnectorSubtask({}) does not exist",
taskID);
return;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 812f8d29151..03a917379ce 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -104,14 +104,10 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
} else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
- } else if (context.getCode() ==
TSStatusCode.METADATA_ERROR.getStatusCode()) {
- if
(context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
- && config.isEnablePartialInsert()) {
- return new TSStatus(
-
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
- }
- return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+ } else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+ &&
(context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
+ && config.isEnablePartialInsert())) {
+ return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
}
return visitStatement(insertBaseStatement, context);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index c2f7adaf8dd..aed8fb6213c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask {
@@ -233,15 +234,18 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
}
public int getTsFileInsertionEventCount() {
- return inputPendingQueue.getTsFileInsertionEventCount();
+ return inputPendingQueue.getTsFileInsertionEventCount()
+ + (lastEvent instanceof TsFileInsertionEvent ? 1 : 0);
}
public int getTabletInsertionEventCount() {
- return inputPendingQueue.getTabletInsertionEventCount();
+ return inputPendingQueue.getTabletInsertionEventCount()
+ + (lastEvent instanceof TabletInsertionEvent ? 1 : 0);
}
public int getPipeHeartbeatEventCount() {
- return inputPendingQueue.getPipeHeartbeatEventCount();
+ return inputPendingQueue.getPipeHeartbeatEventCount()
+ + (lastEvent instanceof PipeHeartbeatEvent ? 1 : 0);
}
public int getAsyncConnectorRetryEventQueueSize() {
@@ -262,7 +266,7 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
count.incrementAndGet();
}
});
- } catch (Exception e) {
+ } catch (final Exception e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Exception occurred when counting event of pipe {}, root cause:
{}",
@@ -271,10 +275,14 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
e);
}
}
+ // Avoid potential NPE in "getPipeName"
+ final EnrichedEvent event =
+ lastEvent instanceof EnrichedEvent ? (EnrichedEvent) lastEvent : null;
return count.get()
+ (outputPipeConnector instanceof IoTDBDataRegionAsyncConnector
? ((IoTDBDataRegionAsyncConnector)
outputPipeConnector).getRetryEventCount(pipeName)
- : 0);
+ : 0)
+ + (Objects.nonNull(event) && pipeName.equals(event.getPipeName()) ? 1
: 0);
}
//////////////////////////// Error report ////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 32f37e20818..7ce959b87dd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.commons.pipe.task.subtask.PipeReportableSubtask;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.PipeProcessorMetrics;
import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
@@ -87,6 +88,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
// Only register dataRegions
if (StorageEngine.getInstance().getAllDataRegionIds().contains(new
DataRegionId(regionId))) {
PipeProcessorMetrics.getInstance().register(this);
+ PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
}
}
@@ -249,6 +251,15 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
return regionId;
}
+ public int getEventCount(final boolean ignoreHeartbeat) {
+ // Avoid potential NPE in "getPipeName"
+ final EnrichedEvent event =
+ lastEvent instanceof EnrichedEvent ? (EnrichedEvent) lastEvent : null;
+ return Objects.nonNull(event) && !(ignoreHeartbeat && event instanceof
PipeHeartbeatEvent)
+ ? 1
+ : 0;
+ }
+
//////////////////////////// Error report ////////////////////////////
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtaskWorker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtaskWorker.java
index 64f01b24105..684fa5ebb0c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtaskWorker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtaskWorker.java
@@ -24,8 +24,9 @@ import org.apache.iotdb.commons.concurrent.WrappedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
public class PipeProcessorSubtaskWorker extends WrappedRunnable {
@@ -40,8 +41,8 @@ public class PipeProcessorSubtaskWorker extends
WrappedRunnable {
private int workingRoundInAdjustmentInterval = 0;
private long sleepingTimeInMilliSecond = 50;
- private final ConcurrentMap<PipeProcessorSubtask, PipeProcessorSubtask>
subtasks =
- new ConcurrentHashMap<>();
+ private final Set<PipeProcessorSubtask> subtasks =
+ Collections.newSetFromMap(new ConcurrentHashMap<>());
@Override
@SuppressWarnings("squid:S2189")
@@ -56,7 +57,7 @@ public class PipeProcessorSubtaskWorker extends
WrappedRunnable {
private void cleanupClosedSubtasksIfNecessary() {
if (++closedSubtaskCleanupRoundCounter %
CLOSED_SUBTASK_CLEANUP_ROUND_INTERVAL == 0) {
- subtasks.keySet().stream()
+ subtasks.stream()
.filter(PipeProcessorSubtask::isClosed)
.collect(Collectors.toList())
.forEach(subtasks::remove);
@@ -68,7 +69,7 @@ public class PipeProcessorSubtaskWorker extends
WrappedRunnable {
boolean canSleepBeforeNextRound = true;
- for (final PipeProcessorSubtask subtask : subtasks.keySet()) {
+ for (final PipeProcessorSubtask subtask : subtasks) {
if (subtask.isClosed() || !subtask.isSubmittingSelf() ||
subtask.isStoppedByException()) {
continue;
}
@@ -79,7 +80,7 @@ public class PipeProcessorSubtaskWorker extends
WrappedRunnable {
canSleepBeforeNextRound = false;
}
subtask.onSuccess(hasAtLeastOneEventProcessed);
- } catch (Exception e) {
+ } catch (final Exception e) {
if (subtask.isClosed()) {
LOGGER.warn("subtask {} is closed, ignore exception", subtask, e);
} else {
@@ -91,11 +92,11 @@ public class PipeProcessorSubtaskWorker extends
WrappedRunnable {
return canSleepBeforeNextRound;
}
- private void sleepIfNecessary(boolean canSleepBeforeNextRound) {
+ private void sleepIfNecessary(final boolean canSleepBeforeNextRound) {
if (canSleepBeforeNextRound) {
try {
Thread.sleep(sleepingTimeInMilliSecond);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
LOGGER.warn("subtask worker is interrupted", e);
Thread.currentThread().interrupt();
}
@@ -122,7 +123,7 @@ public class PipeProcessorSubtaskWorker extends
WrappedRunnable {
}
}
- public void schedule(PipeProcessorSubtask pipeProcessorSubtask) {
- subtasks.put(pipeProcessorSubtask, pipeProcessorSubtask);
+ public void schedule(final PipeProcessorSubtask pipeProcessorSubtask) {
+ subtasks.add(pipeProcessorSubtask);
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/ConcurrentIterableLinkedQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/ConcurrentIterableLinkedQueue.java
index 78a8c1a75fa..306b5491f32 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/ConcurrentIterableLinkedQueue.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/ConcurrentIterableLinkedQueue.java
@@ -24,9 +24,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
+import java.util.Collections;
import java.util.Iterator;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -57,8 +58,8 @@ public class ConcurrentIterableLinkedQueue<E> {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Condition hasNextCondition = lock.writeLock().newCondition();
- private final ConcurrentMap<DynamicIterator, DynamicIterator> iteratorSet =
- new ConcurrentHashMap<>();
+ private final Set<DynamicIterator> iteratorSet =
+ Collections.newSetFromMap(new ConcurrentHashMap<>());
/**
* Add an element to the tail of the queue.
@@ -110,7 +111,7 @@ public class ConcurrentIterableLinkedQueue<E> {
lock.writeLock().lock();
try {
// Iterate over iterators to find the minimum valid newFirstIndex
- for (final DynamicIterator iterator : iteratorSet.keySet()) {
+ for (final DynamicIterator iterator : iteratorSet) {
newFirstIndex = Math.min(newFirstIndex, iterator.getNextIndex());
}
newFirstIndex = Math.max(newFirstIndex, firstIndex);
@@ -138,14 +139,12 @@ public class ConcurrentIterableLinkedQueue<E> {
}
// Update iterators if necessary
- iteratorSet
- .keySet()
- .forEach(
- iterator -> {
- if (iterator.nextIndex == firstIndex) {
- iterator.currentNode = pilotNode;
- }
- });
+ iteratorSet.forEach(
+ iterator -> {
+ if (iterator.nextIndex == firstIndex) {
+ iterator.currentNode = pilotNode;
+ }
+ });
hasNextCondition.signalAll();
@@ -160,7 +159,7 @@ public class ConcurrentIterableLinkedQueue<E> {
lock.writeLock().lock();
try {
// Use a new set to avoid ConcurrentModificationException
-
ImmutableSet.copyOf(iteratorSet.keySet()).forEach(DynamicIterator::close);
+ ImmutableSet.copyOf(iteratorSet).forEach(DynamicIterator::close);
tryRemoveBefore(tailIndex);
} finally {
@@ -237,7 +236,7 @@ public class ConcurrentIterableLinkedQueue<E> {
public DynamicIterator iterateFrom(final long offset) {
final DynamicIterator iterator = new DynamicIterator(offset);
- iteratorSet.put(iterator, iterator);
+ iteratorSet.add(iterator);
return iterator;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
index 7a028f4b0e2..f444e6943b6 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.commons.pipe.task.meta;
+import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -26,12 +27,13 @@ import java.util.concurrent.ConcurrentMap;
public class PipeTemporaryMeta {
- private final ConcurrentMap<Integer, Integer> completedDataNodeIds = new
ConcurrentHashMap<>();
+ private final Set<Integer> completedDataNodeIds =
+ Collections.newSetFromMap(new ConcurrentHashMap<>());
private final ConcurrentMap<Integer, Long> nodeId2RemainingEventMap = new
ConcurrentHashMap<>();
private final ConcurrentMap<Integer, Double> nodeId2RemainingTimeMap = new
ConcurrentHashMap<>();
public void markDataNodeCompleted(final int dataNodeId) {
- completedDataNodeIds.put(dataNodeId, dataNodeId);
+ completedDataNodeIds.add(dataNodeId);
}
public void markDataNodeUncompleted(final int dataNodeId) {
@@ -47,7 +49,7 @@ public class PipeTemporaryMeta {
}
public Set<Integer> getCompletedDataNodeIds() {
- return completedDataNodeIds.keySet();
+ return completedDataNodeIds;
}
public long getGlobalRemainingEvents() {