This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6d445f92a1e Pipe: Degraded the lock in PipeEventCollector to avoid
waitForTsFileClose() blocking pipe drop (#12518)
6d445f92a1e is described below
commit 6d445f92a1ea31f8aca183f3ba6c5d6cdfea48c6
Author: Caideyipi <[email protected]>
AuthorDate: Mon May 13 19:51:03 2024 +0800
Pipe: Degraded the lock in PipeEventCollector to avoid waitForTsFileClose()
blocking pipe drop (#12518)
---
.../db/pipe/task/connection/PipeEventCollector.java | 21 ++++++++++++---------
1 file changed, 12 insertions(+), 9 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index c7e34b53aa6..38779d44bb0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -55,7 +55,9 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
public PipeEventCollector(
- BoundedBlockingPendingQueue<Event> pendingQueue, long creationTime, int
regionId) {
+ final BoundedBlockingPendingQueue<Event> pendingQueue,
+ final long creationTime,
+ final int regionId) {
this.pendingQueue = pendingQueue;
this.creationTime = creationTime;
this.regionId = regionId;
@@ -63,7 +65,7 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
}
@Override
- public synchronized void collect(Event event) {
+ public void collect(final Event event) {
try {
if (event instanceof PipeInsertNodeTabletInsertionEvent) {
parseAndCollectEvent((PipeInsertNodeTabletInsertionEvent) event);
@@ -74,16 +76,17 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
} else {
collectEvent(event);
}
- } catch (PipeException e) {
+ } catch (final PipeException e) {
throw e;
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new PipeException("Error occurred when collecting events from
processor.", e);
}
}
- private void parseAndCollectEvent(PipeInsertNodeTabletInsertionEvent
sourceEvent) {
+ private void parseAndCollectEvent(final PipeInsertNodeTabletInsertionEvent
sourceEvent) {
if (sourceEvent.shouldParseTimeOrPattern()) {
- for (PipeRawTabletInsertionEvent parsedEvent :
sourceEvent.toRawTabletInsertionEvents()) {
+ for (final PipeRawTabletInsertionEvent parsedEvent :
+ sourceEvent.toRawTabletInsertionEvents()) {
collectEvent(parsedEvent);
}
} else {
@@ -91,7 +94,7 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
}
}
- private void parseAndCollectEvent(PipeRawTabletInsertionEvent sourceEvent) {
+ private void parseAndCollectEvent(final PipeRawTabletInsertionEvent
sourceEvent) {
if (sourceEvent.shouldParseTimeOrPattern()) {
final PipeRawTabletInsertionEvent parsedEvent =
sourceEvent.parseEventWithPatternOrTime();
if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) {
@@ -102,7 +105,7 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
}
}
- private void parseAndCollectEvent(PipeTsFileInsertionEvent sourceEvent)
throws Exception {
+ private void parseAndCollectEvent(final PipeTsFileInsertionEvent
sourceEvent) throws Exception {
if (!sourceEvent.waitForTsFileClose()) {
LOGGER.warn(
"Pipe skipping temporary TsFile which shouldn't be transferred: {}",
@@ -124,7 +127,7 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
}
}
- private void collectEvent(Event event) {
+ private synchronized void collectEvent(final Event event) {
collectInvocationCount.incrementAndGet();
if (event instanceof EnrichedEvent) {