This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 40993a2b5b8 Pipe: decrease reference count for untransferred events in
batch when closing & fix parsing alter pipe name (#12305)
40993a2b5b8 is described below
commit 40993a2b5b8cf6c755a61e97bbc122e3d68b5de1
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Apr 10 15:18:56 2024 +0800
Pipe: decrease reference count for untransferred events in batch when
closing & fix parsing alter pipe name (#12305)
---
.../builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java | 2 +-
.../evolvable/builder/PipeTransferBatchReqBuilder.java | 12 +++++++++---
.../apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java | 2 +-
3 files changed, 11 insertions(+), 5 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
index 77d7531df36..8a858bd9a36 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
@@ -30,7 +30,7 @@ public class IoTDBThriftSyncPipeTransferBatchReqBuilder
extends PipeTransferBatc
}
@Override
- public void onSuccess() {
+ public synchronized void onSuccess() {
for (final Event event : events) {
if (event instanceof EnrichedEvent) {
((EnrichedEvent) event)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
index e10e70478b6..0ea50ae73ae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -112,7 +112,8 @@ public abstract class PipeTransferBatchReqBuilder
implements AutoCloseable {
* @param event the given {@link Event}
* @return {@link true} if the batch can be transferred
*/
- public boolean onEvent(TabletInsertionEvent event) throws IOException,
WALPipeException {
+ public synchronized boolean onEvent(TabletInsertionEvent event)
+ throws IOException, WALPipeException {
if (!(event instanceof EnrichedEvent)) {
return false;
}
@@ -139,7 +140,7 @@ public abstract class PipeTransferBatchReqBuilder
implements AutoCloseable {
|| System.currentTimeMillis() - firstEventProcessingTime >=
maxDelayInMs;
}
- public void onSuccess() {
+ public synchronized void onSuccess() {
binaryBuffers.clear();
insertNodeBuffers.clear();
tabletBuffers.clear();
@@ -201,7 +202,12 @@ public abstract class PipeTransferBatchReqBuilder
implements AutoCloseable {
}
@Override
- public void close() {
+ public synchronized void close() {
+ for (final Event event : events) {
+ if (event instanceof EnrichedEvent) {
+ ((EnrichedEvent) event).clearReferenceCount(this.getClass().getName());
+ }
+ }
allocatedMemoryBlock.close();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index 4e8c7b5185b..92cb1add365 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -3634,7 +3634,7 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
final AlterPipeStatement alterPipeStatement = new
AlterPipeStatement(StatementType.ALTER_PIPE);
if (ctx.pipeName != null) {
- alterPipeStatement.setPipeName(ctx.pipeName.getText());
+ alterPipeStatement.setPipeName(parseIdentifier(ctx.pipeName.getText()));
} else {
throw new SemanticException(
"Not support for this sql in ALTER PIPE, please enter pipe name.");