This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e03b9be05ad Pipe: Fixed the bug that mod file may not close in async 
transferring (#12347)
e03b9be05ad is described below

commit e03b9be05ad0522a3d8a06dc3c32f5ff4ab60339
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 18 14:51:46 2024 +0800

    Pipe: Fixed the bug that mod file may not close in async transferring 
(#12347)
---
 .../PipeTransferTsFileInsertionEventHandler.java   | 26 +++++++++++++---------
 1 file changed, 16 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index 467405cf82a..bcafce049aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -71,7 +71,7 @@ public class PipeTransferTsFileInsertionEventHandler
   private AsyncPipeDataTransferServiceClient client;
 
   public PipeTransferTsFileInsertionEventHandler(
-      PipeTsFileInsertionEvent event, IoTDBDataRegionAsyncConnector connector)
+      final PipeTsFileInsertionEvent event, final 
IoTDBDataRegionAsyncConnector connector)
       throws FileNotFoundException {
     this.event = event;
     this.connector = connector;
@@ -93,7 +93,8 @@ public class PipeTransferTsFileInsertionEventHandler
     isSealSignalSent = new AtomicBoolean(false);
   }
 
-  public void transfer(AsyncPipeDataTransferServiceClient client) throws 
TException, IOException {
+  public void transfer(final AsyncPipeDataTransferServiceClient client)
+      throws TException, IOException {
     this.client = client;
     client.setShouldReturnSelf(false);
 
@@ -103,6 +104,11 @@ public class PipeTransferTsFileInsertionEventHandler
       if (currentFile == modFile) {
         currentFile = tsFile;
         position = 0;
+        try {
+          reader.close();
+        } catch (final IOException e) {
+          LOGGER.warn("Failed to close file reader when successfully 
transferred mod file.", e);
+        }
         reader = new RandomAccessFile(tsFile, "r");
         transfer(client);
       } else if (currentFile == tsFile) {
@@ -132,7 +138,7 @@ public class PipeTransferTsFileInsertionEventHandler
   }
 
   @Override
-  public void onComplete(TPipeTransferResp response) {
+  public void onComplete(final TPipeTransferResp response) {
     if (isSealSignalSent.get()) {
       try {
         final TSStatus status = response.getStatus();
@@ -147,7 +153,7 @@ public class PipeTransferTsFileInsertionEventHandler
                       "Seal file %s error, result status %s.", tsFile, 
response.getStatus()),
                   tsFile.getName());
         }
-      } catch (Exception e) {
+      } catch (final Exception e) {
         onError(e);
         return;
       }
@@ -156,7 +162,7 @@ public class PipeTransferTsFileInsertionEventHandler
         if (reader != null) {
           reader.close();
         }
-      } catch (IOException e) {
+      } catch (final IOException e) {
         LOGGER.warn("Failed to close file reader when successfully transferred 
file.", e);
       } finally {
         
event.decreaseReferenceCount(PipeTransferTsFileInsertionEventHandler.class.getName(),
 true);
@@ -175,12 +181,12 @@ public class PipeTransferTsFileInsertionEventHandler
       return;
     }
 
-    // if the isSealSignalSent is false, then the response must be a 
PipeTransferFilePieceResp
+    // If the isSealSignalSent is false, then the response must be a 
PipeTransferFilePieceResp
     try {
       final PipeTransferFilePieceResp resp =
           PipeTransferFilePieceResp.fromTPipeTransferResp(response);
 
-      // this case only happens when the connection is broken, and the 
connector is reconnected
+      // This case only happens when the connection is broken, and the 
connector is reconnected
       // to the receiver, then the receiver will redirect the file position to 
the last position
       final long code = resp.getStatus().getCode();
 
@@ -200,13 +206,13 @@ public class PipeTransferTsFileInsertionEventHandler
       }
 
       transfer(client);
-    } catch (Exception e) {
+    } catch (final Exception e) {
       onError(e);
     }
   }
 
   @Override
-  public void onError(Exception exception) {
+  public void onError(final Exception exception) {
     LOGGER.warn(
         "Failed to transfer TsFileInsertionEvent {} (committer key {}, commit 
id {}).",
         tsFile,
@@ -218,7 +224,7 @@ public class PipeTransferTsFileInsertionEventHandler
       if (reader != null) {
         reader.close();
       }
-    } catch (IOException e) {
+    } catch (final IOException e) {
       LOGGER.warn("Failed to close file reader when failed to transfer file.", 
e);
     } finally {
       connector.addFailureEventToRetryQueue(event);

Reply via email to