This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9cebfcf34 fix:the HdfsStorage can not delete checkpoint file #5046 
(#5054)
9cebfcf34 is described below

commit 9cebfcf340bcca4defaccb889fe72df6476cbe1e
Author: wu-a-ge <[email protected]>
AuthorDate: Mon Jul 10 21:13:41 2023 +0800

    fix:the HdfsStorage can not delete checkpoint file #5046 (#5054)
---
 .../seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java     | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
index 9dcc94f80..dc819c6ad 100644
--- 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
+++ 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
@@ -283,7 +283,9 @@ public class HdfsStorage extends AbstractCheckpointStorage {
                     if (pipelineId.equals(getPipelineIdByFileName(fileName))
                             && 
checkpointId.equals(getCheckpointIdByFileName(fileName))) {
                         try {
-                            fs.delete(new Path(fileName), false);
+                            fs.delete(
+                                    new Path(path + 
DEFAULT_CHECKPOINT_FILE_PATH_SPLIT + fileName),
+                                    false);
                         } catch (Exception e) {
                             log.error(
                                     "Failed to delete checkpoint {} for job 
{}, pipeline {}",
@@ -311,7 +313,9 @@ public class HdfsStorage extends AbstractCheckpointStorage {
                     if (pipelineId.equals(getPipelineIdByFileName(fileName))
                             && 
checkpointIdList.contains(checkpointIdByFileName)) {
                         try {
-                            fs.delete(new Path(fileName), false);
+                            fs.delete(
+                                    new Path(path + 
DEFAULT_CHECKPOINT_FILE_PATH_SPLIT + fileName),
+                                    false);
                         } catch (Exception e) {
                             log.error(
                                     "Failed to delete checkpoint {} for job 
{}, pipeline {}",

Reply via email to