This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9cebfcf34 fix:the HdfsStorage can not delete checkpoint file #5046
(#5054)
9cebfcf34 is described below
commit 9cebfcf340bcca4defaccb889fe72df6476cbe1e
Author: wu-a-ge <[email protected]>
AuthorDate: Mon Jul 10 21:13:41 2023 +0800
fix:the HdfsStorage can not delete checkpoint file #5046 (#5054)
---
.../seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
index 9dcc94f80..dc819c6ad 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
@@ -283,7 +283,9 @@ public class HdfsStorage extends AbstractCheckpointStorage {
if (pipelineId.equals(getPipelineIdByFileName(fileName))
&&
checkpointId.equals(getCheckpointIdByFileName(fileName))) {
try {
- fs.delete(new Path(fileName), false);
+ fs.delete(
+ new Path(path +
DEFAULT_CHECKPOINT_FILE_PATH_SPLIT + fileName),
+ false);
} catch (Exception e) {
log.error(
"Failed to delete checkpoint {} for job
{}, pipeline {}",
@@ -311,7 +313,9 @@ public class HdfsStorage extends AbstractCheckpointStorage {
if (pipelineId.equals(getPipelineIdByFileName(fileName))
&&
checkpointIdList.contains(checkpointIdByFileName)) {
try {
- fs.delete(new Path(fileName), false);
+ fs.delete(
+ new Path(path +
DEFAULT_CHECKPOINT_FILE_PATH_SPLIT + fileName),
+ false);
} catch (Exception e) {
log.error(
"Failed to delete checkpoint {} for job
{}, pipeline {}",