This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 40993a2b5b8 Pipe: decrease reference count for untransferred events in 
batch when closing & fix parsing alter pipe name (#12305)
40993a2b5b8 is described below

commit 40993a2b5b8cf6c755a61e97bbc122e3d68b5de1
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Apr 10 15:18:56 2024 +0800

    Pipe: decrease reference count for untransferred events in batch when 
closing & fix parsing alter pipe name (#12305)
---
 .../builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java  |  2 +-
 .../evolvable/builder/PipeTransferBatchReqBuilder.java       | 12 +++++++++---
 .../apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java  |  2 +-
 3 files changed, 11 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
index 77d7531df36..8a858bd9a36 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
@@ -30,7 +30,7 @@ public class IoTDBThriftSyncPipeTransferBatchReqBuilder 
extends PipeTransferBatc
   }
 
   @Override
-  public void onSuccess() {
+  public synchronized void onSuccess() {
     for (final Event event : events) {
       if (event instanceof EnrichedEvent) {
         ((EnrichedEvent) event)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
index e10e70478b6..0ea50ae73ae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -112,7 +112,8 @@ public abstract class PipeTransferBatchReqBuilder 
implements AutoCloseable {
    * @param event the given {@link Event}
    * @return {@link true} if the batch can be transferred
    */
-  public boolean onEvent(TabletInsertionEvent event) throws IOException, 
WALPipeException {
+  public synchronized boolean onEvent(TabletInsertionEvent event)
+      throws IOException, WALPipeException {
     if (!(event instanceof EnrichedEvent)) {
       return false;
     }
@@ -139,7 +140,7 @@ public abstract class PipeTransferBatchReqBuilder 
implements AutoCloseable {
         || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
   }
 
-  public void onSuccess() {
+  public synchronized void onSuccess() {
     binaryBuffers.clear();
     insertNodeBuffers.clear();
     tabletBuffers.clear();
@@ -201,7 +202,12 @@ public abstract class PipeTransferBatchReqBuilder 
implements AutoCloseable {
   }
 
   @Override
-  public void close() {
+  public synchronized void close() {
+    for (final Event event : events) {
+      if (event instanceof EnrichedEvent) {
+        ((EnrichedEvent) event).clearReferenceCount(this.getClass().getName());
+      }
+    }
     allocatedMemoryBlock.close();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index 4e8c7b5185b..92cb1add365 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -3634,7 +3634,7 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
     final AlterPipeStatement alterPipeStatement = new 
AlterPipeStatement(StatementType.ALTER_PIPE);
 
     if (ctx.pipeName != null) {
-      alterPipeStatement.setPipeName(ctx.pipeName.getText());
+      alterPipeStatement.setPipeName(parseIdentifier(ctx.pipeName.getText()));
     } else {
       throw new SemanticException(
           "Not support for this sql in ALTER PIPE, please enter pipe name.");

Reply via email to