This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8e9c43479c8 Pipe: Modify epoch status metric changes (#16272)
8e9c43479c8 is described below
commit 8e9c43479c8538af23b376f43023510d7ee6c525
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Aug 27 16:31:38 2025 +0800
Pipe: Modify epoch status metric changes (#16272)
* Pipe: Modify epoch status metric changes
* fix
---
.../PipeRealtimeDataRegionHybridSource.java | 1 +
.../realtime/PipeRealtimeDataRegionLogSource.java | 1 +
.../realtime/PipeRealtimeDataRegionSource.java | 15 ++++++++++++
.../PipeRealtimeDataRegionTsFileSource.java | 2 ++
.../dataregion/realtime/epoch/TsFileEpoch.java | 28 +++++++++++++++++++---
.../realtime/epoch/TsFileEpochManager.java | 3 ---
6 files changed, 44 insertions(+), 6 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index 25256238794..7cf476a66cb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -53,6 +53,7 @@ public class PipeRealtimeDataRegionHybridSource extends
PipeRealtimeDataRegionSo
extractTabletInsertion(event);
} else if (eventToExtract instanceof TsFileInsertionEvent) {
extractTsFileInsertion(event);
+ event.getTsFileEpoch().clearState(this);
} else if (eventToExtract instanceof PipeHeartbeatEvent) {
extractHeartbeat(event);
} else if (eventToExtract instanceof PipeDeleteDataNodeEvent) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
index def6649b943..35a2c7190c8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
@@ -47,6 +47,7 @@ public class PipeRealtimeDataRegionLogSource extends
PipeRealtimeDataRegionSourc
extractTabletInsertion(event);
} else if (eventToExtract instanceof TsFileInsertionEvent) {
extractTsFileInsertion(event);
+ event.getTsFileEpoch().clearState(this);
} else if (eventToExtract instanceof PipeHeartbeatEvent) {
extractHeartbeat(event);
} else if (eventToExtract instanceof PipeDeleteDataNodeEvent) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index 7c547e94407..cf29544744d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -63,6 +63,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -121,6 +122,8 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
private boolean sloppyTimeRange; // true to disable time range filter after
extraction
private boolean sloppyPattern; // true to disable pattern filter after
extraction
+ private AtomicLong extractEpochSize = new AtomicLong();
+
// This queue is used to store pending events extracted by the method
extract(). The method
// supply() will poll events from this queue and send them to the next pipe
plugin.
protected final UnboundedBlockingPendingQueue<Event> pendingQueue =
@@ -646,4 +649,16 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
public String getTaskID() {
return taskID;
}
+
+ public void increaseExtractEpochSize() {
+ extractEpochSize.incrementAndGet();
+ }
+
+ public void decreaseExtractEpochSize() {
+ extractEpochSize.decrementAndGet();
+ }
+
+ public boolean extractEpochSizeIsEmpty() {
+ return extractEpochSize.get() == 0;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
index f666589dda3..8f74bc63fb3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
@@ -74,6 +74,8 @@ public class PipeRealtimeDataRegionTsFileSource extends
PipeRealtimeDataRegionSo
// Ignore the event.
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileSource.class.getName(),
false);
}
+
+ event.getTsFileEpoch().clearState(this);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
index b1ba0d37084..1f9f7d178f0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpoch.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.db.pipe.metric.source.PipeDataRegionSourceMetrics;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
@@ -49,9 +50,30 @@ public class TsFileEpoch {
public void migrateState(
final PipeRealtimeDataRegionSource extractor, final
TsFileEpochStateMigrator visitor) {
- dataRegionExtractor2State
- .computeIfAbsent(extractor, o -> new AtomicReference<>(State.EMPTY))
- .getAndUpdate(visitor::migrate);
+ AtomicReference<State> stateRef = dataRegionExtractor2State.get(extractor);
+
+ if (stateRef == null) {
+ dataRegionExtractor2State.putIfAbsent(
+ extractor, stateRef = new AtomicReference<>(State.EMPTY));
+ extractor.increaseExtractEpochSize();
+ setExtractorsRecentProcessedTsFileEpochState();
+ }
+
+ State migratedState = visitor.migrate(stateRef.get());
+ if (!Objects.equals(stateRef.get(), migratedState)) {
+ stateRef.set(migratedState);
+ setExtractorsRecentProcessedTsFileEpochState();
+ }
+ }
+
+ public void clearState(final PipeRealtimeDataRegionSource extractor) {
+ if (dataRegionExtractor2State.containsKey(extractor)) {
+ extractor.decreaseExtractEpochSize();
+ }
+ if (extractor.extractEpochSizeIsEmpty()) {
+ PipeDataRegionSourceMetrics.getInstance()
+ .setRecentProcessedTsFileEpochState(extractor.getTaskID(),
State.EMPTY);
+ }
}
public void setExtractorsRecentProcessedTsFileEpochState() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpochManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpochManager.java
index 10951b2db8f..887321add22 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpochManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/epoch/TsFileEpochManager.java
@@ -61,9 +61,6 @@ public class TsFileEpochManager {
});
final TsFileEpoch epoch = filePath2Epoch.remove(filePath);
- // When all data corresponding to this TsFileEpoch have been extracted,
update the state
- // of the extractors processing this TsFileEpoch.
- epoch.setExtractorsRecentProcessedTsFileEpochState();
LOGGER.info("All data in TsFileEpoch {} was extracted", epoch);
return new PipeRealtimeEvent(