This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e03b9be05ad Pipe: Fixed the bug that mod file may not close in async
transferring (#12347)
e03b9be05ad is described below
commit e03b9be05ad0522a3d8a06dc3c32f5ff4ab60339
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 18 14:51:46 2024 +0800
Pipe: Fixed the bug that mod file may not close in async transferring
(#12347)
---
.../PipeTransferTsFileInsertionEventHandler.java | 26 +++++++++++++---------
1 file changed, 16 insertions(+), 10 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index 467405cf82a..bcafce049aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -71,7 +71,7 @@ public class PipeTransferTsFileInsertionEventHandler
private AsyncPipeDataTransferServiceClient client;
public PipeTransferTsFileInsertionEventHandler(
- PipeTsFileInsertionEvent event, IoTDBDataRegionAsyncConnector connector)
+ final PipeTsFileInsertionEvent event, final
IoTDBDataRegionAsyncConnector connector)
throws FileNotFoundException {
this.event = event;
this.connector = connector;
@@ -93,7 +93,8 @@ public class PipeTransferTsFileInsertionEventHandler
isSealSignalSent = new AtomicBoolean(false);
}
- public void transfer(AsyncPipeDataTransferServiceClient client) throws
TException, IOException {
+ public void transfer(final AsyncPipeDataTransferServiceClient client)
+ throws TException, IOException {
this.client = client;
client.setShouldReturnSelf(false);
@@ -103,6 +104,11 @@ public class PipeTransferTsFileInsertionEventHandler
if (currentFile == modFile) {
currentFile = tsFile;
position = 0;
+ try {
+ reader.close();
+ } catch (final IOException e) {
+ LOGGER.warn("Failed to close file reader when successfully
transferred mod file.", e);
+ }
reader = new RandomAccessFile(tsFile, "r");
transfer(client);
} else if (currentFile == tsFile) {
@@ -132,7 +138,7 @@ public class PipeTransferTsFileInsertionEventHandler
}
@Override
- public void onComplete(TPipeTransferResp response) {
+ public void onComplete(final TPipeTransferResp response) {
if (isSealSignalSent.get()) {
try {
final TSStatus status = response.getStatus();
@@ -147,7 +153,7 @@ public class PipeTransferTsFileInsertionEventHandler
"Seal file %s error, result status %s.", tsFile,
response.getStatus()),
tsFile.getName());
}
- } catch (Exception e) {
+ } catch (final Exception e) {
onError(e);
return;
}
@@ -156,7 +162,7 @@ public class PipeTransferTsFileInsertionEventHandler
if (reader != null) {
reader.close();
}
- } catch (IOException e) {
+ } catch (final IOException e) {
LOGGER.warn("Failed to close file reader when successfully transferred
file.", e);
} finally {
event.decreaseReferenceCount(PipeTransferTsFileInsertionEventHandler.class.getName(),
true);
@@ -175,12 +181,12 @@ public class PipeTransferTsFileInsertionEventHandler
return;
}
- // if the isSealSignalSent is false, then the response must be a
PipeTransferFilePieceResp
+ // If the isSealSignalSent is false, then the response must be a
PipeTransferFilePieceResp
try {
final PipeTransferFilePieceResp resp =
PipeTransferFilePieceResp.fromTPipeTransferResp(response);
- // this case only happens when the connection is broken, and the
connector is reconnected
+ // This case only happens when the connection is broken, and the
connector is reconnected
// to the receiver, then the receiver will redirect the file position to
the last position
final long code = resp.getStatus().getCode();
@@ -200,13 +206,13 @@ public class PipeTransferTsFileInsertionEventHandler
}
transfer(client);
- } catch (Exception e) {
+ } catch (final Exception e) {
onError(e);
}
}
@Override
- public void onError(Exception exception) {
+ public void onError(final Exception exception) {
LOGGER.warn(
"Failed to transfer TsFileInsertionEvent {} (committer key {}, commit
id {}).",
tsFile,
@@ -218,7 +224,7 @@ public class PipeTransferTsFileInsertionEventHandler
if (reader != null) {
reader.close();
}
- } catch (IOException e) {
+ } catch (final IOException e) {
LOGGER.warn("Failed to close file reader when failed to transfer file.",
e);
} finally {
connector.addFailureEventToRetryQueue(event);