This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8f6ccfb3b [Engine][Checkpoint-Storage]Support S3 protocol (#3675)
8f6ccfb3b is described below

commit 8f6ccfb3b97cb92e37ce61931b0510df4f9da8a6
Author: Kirs <[email protected]>
AuthorDate: Thu Dec 8 17:26:28 2022 +0800

    [Engine][Checkpoint-Storage]Support S3 protocol (#3675)
    
    Support S3 protocol and Local file in hdfs plugin
    Fix Hdfs storage  may have file rename failure but return success
---
 .../storage/api/AbstractCheckpointStorage.java     |  2 +-
 .../checkpoint-storage-hdfs/pom.xml                | 18 ++---
 .../checkpoint/storage/hdfs/HdfsStorage.java       | 63 ++++++---------
 .../storage/hdfs/HdfsStorageFactory.java           | 19 +++++
 .../storage/hdfs/common/AbstractConfiguration.java | 64 +++++++++++++++
 .../FileConfiguration.java}                        | 31 ++++++--
 .../storage/hdfs/common/HdfsConfiguration.java     | 91 ++++++++++++++++++++++
 .../storage/hdfs/common/LocalConfiguration.java    | 42 ++++++++++
 .../storage/hdfs/common/S3Configuration.java       | 84 ++++++++++++++++++++
 .../storage/hdfs/AbstractFileCheckPointTest.java   | 84 ++++++++++++++++++++
 .../storage/hdfs/LocalFileCheckPointTest.java}     | 20 ++++-
 .../storage/hdfs/S3FileCheckpointTest.java}        | 33 ++++----
 .../storage/localfile/LocalFileStorageFactory.java |  3 +
 13 files changed, 477 insertions(+), 77 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
index eb3593504..ea3925f0c 100644
--- 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
+++ 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
@@ -95,7 +95,7 @@ public abstract class AbstractCheckpointStorage implements 
CheckpointStorage {
     }
 
     public String getCheckPointName(PipelineState state) {
-        return System.nanoTime() + FILE_NAME_SPLIT + 
ThreadLocalRandom.current().nextInt(FILE_NAME_RANDOM_RANGE) + FILE_NAME_SPLIT + 
state.getPipelineId() + FILE_NAME_SPLIT + state.getCheckpointId() + "." + 
FILE_FORMAT;
+        return System.currentTimeMillis() + FILE_NAME_SPLIT + 
ThreadLocalRandom.current().nextInt(FILE_NAME_RANDOM_RANGE) + FILE_NAME_SPLIT + 
state.getPipelineId() + FILE_NAME_SPLIT + state.getCheckpointId() + "." + 
FILE_FORMAT;
     }
 
     public byte[] serializeCheckPointData(PipelineState state) throws 
IOException {
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/pom.xml
 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/pom.xml
index d02219441..d1e3f584b 100644
--- 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/pom.xml
+++ 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/pom.xml
@@ -31,16 +31,16 @@
     <artifactId>checkpoint-storage-hdfs</artifactId>
     <dependencies>
         <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-shaded-hadoop-2</artifactId>
-            <version>${flink-shaded-hadoop-2.version}</version>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-aws</artifactId>
+            <version>3.0.0</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>3.0.0</version>
             <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.commons</groupId>
-                    <artifactId>commons-lang</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
index 598e5ef8e..cb76a8198 100644
--- 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
+++ 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
@@ -25,6 +25,8 @@ import static 
org.apache.seatunnel.engine.checkpoint.storage.constants.StorageCo
 import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
 import 
org.apache.seatunnel.engine.checkpoint.storage.api.AbstractCheckpointStorage;
 import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+import 
org.apache.seatunnel.engine.checkpoint.storage.hdfs.common.AbstractConfiguration;
+import 
org.apache.seatunnel.engine.checkpoint.storage.hdfs.common.FileConfiguration;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -35,8 +37,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.security.UserGroupInformation;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -48,11 +48,8 @@ import java.util.Set;
 public class HdfsStorage extends AbstractCheckpointStorage {
 
     public FileSystem fs;
-    private static final String HADOOP_SECURITY_AUTHENTICATION_KEY = 
"hadoop.security.authentication";
-
-    private static final String KERBEROS_KEY = "kerberos";
-
     private static final String STORAGE_TMP_SUFFIX = "tmp";
+    private static final String STORAGE_TYPE_KEY = "storage.type";
 
     public HdfsStorage(Map<String, String> configuration) throws 
CheckpointStorageException {
         this.initStorage(configuration);
@@ -60,31 +57,26 @@ public class HdfsStorage extends AbstractCheckpointStorage {
 
     @Override
     public void initStorage(Map<String, String> configuration) throws 
CheckpointStorageException {
-        Configuration hadoopConf = new Configuration();
-        if (configuration.containsKey(HdfsConstants.HDFS_DEF_FS_NAME)) {
-            hadoopConf.set(HdfsConstants.HDFS_DEF_FS_NAME, 
configuration.get(HdfsConstants.HDFS_DEF_FS_NAME));
-        }
         if (StringUtils.isNotBlank(configuration.get(STORAGE_NAME_SPACE))) {
             setStorageNameSpace(configuration.get(STORAGE_NAME_SPACE));
+            configuration.remove(STORAGE_NAME_SPACE);
         }
-        // todo support other config configurations
-        if (configuration.containsKey(HdfsConstants.KERBEROS_PRINCIPAL) && 
configuration.containsKey(HdfsConstants.KERBEROS_KEYTAB_FILE_PATH)) {
-            String kerberosPrincipal = 
configuration.get(HdfsConstants.KERBEROS_PRINCIPAL);
-            String kerberosKeytabFilePath = 
configuration.get(HdfsConstants.KERBEROS_KEYTAB_FILE_PATH);
-            if (StringUtils.isNotBlank(kerberosPrincipal) && 
StringUtils.isNotBlank(kerberosKeytabFilePath)) {
-                hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, 
KERBEROS_KEY);
-                authenticateKerberos(kerberosPrincipal, 
kerberosKeytabFilePath, hadoopConf);
-            }
-        }
-        JobConf jobConf = new JobConf(hadoopConf);
+        Configuration hadoopConf = getConfiguration(configuration);
         try {
-            fs = FileSystem.get(jobConf);
+            fs = FileSystem.get(hadoopConf);
         } catch (IOException e) {
             throw new CheckpointStorageException("Failed to get file system", 
e);
         }
 
     }
 
+    private Configuration getConfiguration(Map<String, String> config) throws 
CheckpointStorageException {
+        String storageType = config.getOrDefault(STORAGE_TYPE_KEY, 
FileConfiguration.LOCAL.toString());
+        config.remove(STORAGE_TYPE_KEY);
+        AbstractConfiguration configuration = 
FileConfiguration.valueOf(storageType.toUpperCase()).getConfiguration(storageType);
+        return configuration.buildConfiguration(config);
+    }
+
     @Override
     public String storeCheckPoint(PipelineState state) throws 
CheckpointStorageException {
         byte[] datas;
@@ -98,10 +90,17 @@ public class HdfsStorage extends AbstractCheckpointStorage {
         Path tmpFilePath = new Path(getStorageParentDirectory() + 
state.getJobId() + "/" + getCheckPointName(state) + STORAGE_TMP_SUFFIX);
         try (FSDataOutputStream out = fs.create(tmpFilePath)) {
             out.write(datas);
-            out.hsync();
-            fs.rename(tmpFilePath, filePath);
         } catch (IOException e) {
             throw new CheckpointStorageException("Failed to write checkpoint 
data, state: " + state, e);
+        }
+        try {
+            boolean success = fs.rename(tmpFilePath, filePath);
+            if (!success) {
+                throw new CheckpointStorageException("Failed to rename tmp 
file to final file");
+            }
+
+        } catch (IOException e) {
+            throw new CheckpointStorageException("Failed to rename tmp file to 
final file");
         } finally {
             try {
                 // clean up tmp file, if still lying around
@@ -112,6 +111,7 @@ public class HdfsStorage extends AbstractCheckpointStorage {
                 log.error("Failed to delete tmp file", ioe);
             }
         }
+
         return filePath.getName();
     }
 
@@ -245,23 +245,6 @@ public class HdfsStorage extends AbstractCheckpointStorage 
{
         });
     }
 
-    /**
-     * Authenticate kerberos
-     *
-     * @param kerberosPrincipal      kerberos principal
-     * @param kerberosKeytabFilePath kerberos keytab file path
-     * @param hdfsConf               hdfs configuration
-     * @throws CheckpointStorageException authentication exception
-     */
-    private void authenticateKerberos(String kerberosPrincipal, String 
kerberosKeytabFilePath, Configuration hdfsConf) throws 
CheckpointStorageException {
-        UserGroupInformation.setConfiguration(hdfsConf);
-        try {
-            UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, 
kerberosKeytabFilePath);
-        } catch (IOException e) {
-            throw new CheckpointStorageException("Failed to login user from 
keytab : " + kerberosKeytabFilePath + " and kerberos principal : " + 
kerberosPrincipal, e);
-        }
-    }
-
     private List<String> getFileNames(String path) throws 
CheckpointStorageException {
         try {
 
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
index 791811274..d528dc9f5 100644
--- 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
+++ 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
@@ -30,6 +30,25 @@ import com.google.auto.service.AutoService;
 
 import java.util.Map;
 
+/**
+ * HdfsCheckpointStorageFactory.
+ * if you want to use HdfsCheckpointStorage, you should add the following 
configuration in the configuration file:
+ * <pre>
+ *      storage.type = hdfs # hdfs, local(default),s3
+ *  </pre>
+ * then you need to configure the following parameters by the storage.type:
+ * hdfs  {@link 
org.apache.seatunnel.engine.checkpoint.storage.hdfs.common.HdfsConfiguration}
+ * local {@link 
org.apache.seatunnel.engine.checkpoint.storage.hdfs.common.LocalConfiguration}
+ * s3    {@link 
org.apache.seatunnel.engine.checkpoint.storage.hdfs.common.S3Configuration}
+ * eg: s3
+ * <pre>
+ *      storage.type = "s3"
+ *      s3.assess.key = "your access key"
+ *      s3.script.key = "your script key"
+ *      s3.bucket= "s3a://your bucket"
+ *      fs.s3a.aws.credentials.provider = 
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
+ *  </pre>
+ */
 @AutoService(Factory.class)
 public class HdfsStorageFactory implements CheckpointStorageFactory {
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/AbstractConfiguration.java
 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/AbstractConfiguration.java
new file mode 100644
index 000000000..4a28eb80b
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/AbstractConfiguration.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs.common;
+
+import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+public abstract class AbstractConfiguration {
+
+    protected static final String HDFS_IMPL_KEY = "impl";
+
+    /**
+     * check the configuration keys
+     *
+     * @param config configuration
+     * @param keys  keys
+     */
+    void checkConfiguration(Map<String, String> config, String... keys) {
+        for (String key : keys) {
+            if (!config.containsKey(key) || null == config.get(key)) {
+                throw new IllegalArgumentException(key + " is required");
+            }
+        }
+    }
+
+    public abstract Configuration buildConfiguration(Map<String, String> 
config) throws CheckpointStorageException;
+
+    /**
+     * set extra options for configuration
+     *
+     * @param hadoopConf hadoop configuration
+     * @param config     extra options
+     * @param prefix     prefix of extra options
+     */
+    void setExtraConfiguration(Configuration hadoopConf, Map<String, String> 
config, String prefix) {
+        config.forEach((k, v) -> {
+            if (k.startsWith(prefix)) {
+                hadoopConf.set(k, v);
+            }
+        });
+    }
+
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsConstants.java
 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/FileConfiguration.java
similarity index 53%
copy from 
seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsConstants.java
copy to 
seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/FileConfiguration.java
index d054d1611..b85acdfef 100644
--- 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsConstants.java
+++ 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/FileConfiguration.java
@@ -18,13 +18,34 @@
  *
  */
 
-package org.apache.seatunnel.engine.checkpoint.storage.hdfs;
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs.common;
 
-public class HdfsConstants {
+public enum FileConfiguration {
+    LOCAL("local", new LocalConfiguration()),
+    HDFS("hdfs", new HdfsConfiguration()),
+    S3("s3", new S3Configuration());
 
-    public static final String HDFS_DEF_FS_NAME = "fs.defaultFS";
+    /**
+     * file system type
+     */
+    private String name;
 
-    public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
+    /**
+     * file system configuration
+     */
+    private AbstractConfiguration configuration;
+
+    FileConfiguration(String name, AbstractConfiguration configuration) {
+        this.name = name;
+        this.configuration = configuration;
+    }
+
+    public AbstractConfiguration getConfiguration(String name) {
+        return configuration;
+    }
+
+    public String getName() {
+        return name;
+    }
 
-    public static final String KERBEROS_KEYTAB_FILE_PATH = 
"kerberosKeytabFilePath";
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
new file mode 100644
index 000000000..dc912c29e
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs.common;
+
+import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
+
+import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class HdfsConfiguration extends AbstractConfiguration {
+
+    /**
+     * hdfs uri is required
+     */
+    private static final String HDFS_DEF_FS_NAME = "fs.defaultFS";
+    /**
+     * hdfs kerberos principal( is optional)
+     */
+    private static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
+    private static final String KERBEROS_KEYTAB_FILE_PATH = 
"kerberosKeytabFilePath";
+    private static final String HADOOP_SECURITY_AUTHENTICATION_KEY = 
"hadoop.security.authentication";
+
+    private static final String KERBEROS_KEY = "kerberos";
+
+    /***********  Hdfs constants  **************/
+    private static final String HDFS_IMPL = 
"org.apache.hadoop.hdfs.DistributedFileSystem";
+    private static final String HDFS_IMPL_KEY = "fs.hdfs.impl";
+
+    @Override
+    public Configuration buildConfiguration(Map<String, String> config) throws 
CheckpointStorageException {
+        checkConfiguration(config, HDFS_DEF_FS_NAME);
+        Configuration hadoopConf = new Configuration();
+        if (config.containsKey(HDFS_DEF_FS_NAME)) {
+            hadoopConf.set(HDFS_DEF_FS_NAME, config.get(HDFS_DEF_FS_NAME));
+        }
+        hadoopConf.set(HDFS_IMPL_KEY, HDFS_IMPL);
+        hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(FS_DEFAULT_NAME_KEY));
+        if (config.containsKey(KERBEROS_PRINCIPAL) && 
config.containsKey(KERBEROS_KEYTAB_FILE_PATH)) {
+            String kerberosPrincipal = config.get(KERBEROS_PRINCIPAL);
+            String kerberosKeytabFilePath = 
config.get(KERBEROS_KEYTAB_FILE_PATH);
+            if (StringUtils.isNotBlank(kerberosPrincipal) && 
StringUtils.isNotBlank(kerberosKeytabFilePath)) {
+                hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, 
KERBEROS_KEY);
+                authenticateKerberos(kerberosPrincipal, 
kerberosKeytabFilePath, hadoopConf);
+            }
+        }
+        //todo support other hdfs optional config keys
+        return hadoopConf;
+    }
+
+    /**
+     * Authenticate kerberos
+     *
+     * @param kerberosPrincipal      kerberos principal
+     * @param kerberosKeytabFilePath kerberos keytab file path
+     * @param hdfsConf               hdfs configuration
+     * @throws CheckpointStorageException authentication exception
+     */
+    private void authenticateKerberos(String kerberosPrincipal, String 
kerberosKeytabFilePath, Configuration hdfsConf) throws 
CheckpointStorageException {
+        UserGroupInformation.setConfiguration(hdfsConf);
+        try {
+            UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, 
kerberosKeytabFilePath);
+        } catch (IOException e) {
+            throw new CheckpointStorageException("Failed to login user from 
keytab : " + kerberosKeytabFilePath + " and kerberos principal : " + 
kerberosPrincipal, e);
+        }
+    }
+
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/LocalConfiguration.java
 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/LocalConfiguration.java
new file mode 100644
index 000000000..998f0b1f0
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/LocalConfiguration.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs.common;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
+import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+public class LocalConfiguration extends AbstractConfiguration {
+
+    private static final String HDFS_LOCAL_IMPL = 
"org.apache.hadoop.fs.LocalFileSystem";
+    private static final String HDFS_LOCAL_IMPL_KEY = "fs.file.impl";
+
+    @Override
+    public Configuration buildConfiguration(Map<String, String> config) {
+        Configuration hadoopConf = new Configuration();
+        hadoopConf.set(HDFS_LOCAL_IMPL_KEY, HDFS_LOCAL_IMPL);
+        hadoopConf.set(FS_DEFAULT_NAME_KEY, 
config.getOrDefault(FS_DEFAULT_NAME_KEY, FS_DEFAULT_NAME_DEFAULT));
+        return hadoopConf;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/S3Configuration.java
 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/S3Configuration.java
new file mode 100644
index 000000000..868a0d193
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/S3Configuration.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs.common;
+
+import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+/**
+ * S3Configuration
+ * we just support s3n and s3a protocol.
+ * some hadoop low version not support s3a, if you want to use s3a, you should 
check your hadoop version first.
+ * <p>
+ * access, secret and bucket is required, and the default schema is s3n
+ * we used the bucket name to get the protocol,if you used s3a, this bucket 
name must be s3a://bucket, if you used s3n, this bucket name must be 
s3n://bucket
+ * <p>
+ * other configuration is optional, if you need to set other configuration, 
you can set it in the config
+ * and the parameter name is the same as the hadoop configuration.
+ * <p>
+ * eg: if you want to set the endpoint, you can set it in the config like 
this: config.put("fs.s3a.endpoint", "http://),
+ * the prefix is fs.s3a and must be the same as the hadoop configuration
+ * <p>
+ * more information about the configuration, please refer to the official 
website:
+ * 
https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html
+ */
+public class S3Configuration extends AbstractConfiguration {
+
+    /**************** S3 required keys ***************/
+    public static final String S3_ACCESS_KEY = "access.key";
+    public static final String S3_SECRET_KEY = "secret.key";
+    public static final String S3_BUCKET_KEY = "s3.bucket";
+
+
+    /* S3 constants */
+    private static final String HDFS_S3N_IMPL = 
"org.apache.hadoop.fs.s3native.NativeS3FileSystem";
+    private static final String HDFS_S3A_IMPL = 
"org.apache.hadoop.fs.s3a.S3AFileSystem";
+    private static final String S3A_PROTOCOL = "s3a";
+    private static final String DEFAULT_PROTOCOL = "s3n";
+    private static final String S3_FORMAT_KEY = "fs.%s.%s";
+    private static final String SPLIT_CHAR = ".";
+    private static final String FS_KEY = "fs.";
+
+    @Override
+    public Configuration buildConfiguration(Map<String, String> config) {
+        checkConfiguration(config, S3_ACCESS_KEY, S3_SECRET_KEY, 
S3_BUCKET_KEY);
+        String protocol = DEFAULT_PROTOCOL;
+        if (config.get(S3_BUCKET_KEY).startsWith(S3A_PROTOCOL)) {
+            protocol = S3A_PROTOCOL;
+        }
+        String fsImpl = protocol.equals(S3A_PROTOCOL) ? HDFS_S3A_IMPL : 
HDFS_S3N_IMPL;
+        Configuration hadoopConf = new Configuration();
+        hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(S3_BUCKET_KEY));
+        hadoopConf.set(formatKey(protocol, S3_ACCESS_KEY), 
config.get(S3_ACCESS_KEY));
+        hadoopConf.set(formatKey(protocol, S3_SECRET_KEY), 
config.get(S3_SECRET_KEY));
+        hadoopConf.set(formatKey(protocol, HDFS_IMPL_KEY), fsImpl);
+        setExtraConfiguration(hadoopConf, config, FS_KEY + protocol + 
SPLIT_CHAR);
+        return hadoopConf;
+    }
+
+    private String formatKey(String protocol, String key) {
+        return String.format(S3_FORMAT_KEY, protocol, key);
+    }
+
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/AbstractFileCheckPointTest.java
 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/AbstractFileCheckPointTest.java
new file mode 100644
index 000000000..ebadf614a
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/AbstractFileCheckPointTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs;
+
+import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
+import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+public abstract  class AbstractFileCheckPointTest {
+
+    protected static HdfsStorage STORAGE;
+    protected static final String JOB_ID = "chris";
+    @Test
+    public void testGetAllCheckpoints() throws CheckpointStorageException {
+
+        List<PipelineState> pipelineStates = STORAGE.getAllCheckpoints(JOB_ID);
+        Assertions.assertEquals(3, pipelineStates.size());
+    }
+
+    @Test
+    public void testGetLatestCheckpoints() throws CheckpointStorageException {
+        List<PipelineState> pipelineStates = 
STORAGE.getLatestCheckpoint(JOB_ID);
+        Assertions.assertEquals(2, pipelineStates.size());
+    }
+
+    @Test
+    public void testGetLatestCheckpointByJobIdAndPipelineId() throws 
CheckpointStorageException {
+        PipelineState state = 
STORAGE.getLatestCheckpointByJobIdAndPipelineId(JOB_ID, "1");
+        Assertions.assertEquals(2, state.getCheckpointId());
+    }
+
+    @Test
+    public void testGetCheckpointsByJobIdAndPipelineId() throws 
CheckpointStorageException {
+        List<PipelineState> state = 
STORAGE.getCheckpointsByJobIdAndPipelineId(JOB_ID, "1");
+        Assertions.assertEquals(2, state.size());
+    }
+
+    @AfterAll
+    public static void teardown() {
+        STORAGE.deleteCheckpoint(JOB_ID);
+    }
+
+    /**
+     * init storage data
+     * @throws CheckpointStorageException exception if init failed
+     */
+    protected static void initStorageData() throws CheckpointStorageException {
+        PipelineState pipelineState = PipelineState.builder()
+            .jobId(JOB_ID)
+            .pipelineId(1)
+            .checkpointId(1)
+            .states(new byte[0])
+            .build();
+        STORAGE.storeCheckPoint(pipelineState);
+        pipelineState.setCheckpointId(2);
+        STORAGE.storeCheckPoint(pipelineState);
+        pipelineState.setPipelineId(2);
+        pipelineState.setCheckpointId(3);
+        STORAGE.storeCheckPoint(pipelineState);
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsConstants.java
 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/LocalFileCheckPointTest.java
similarity index 59%
rename from 
seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsConstants.java
rename to 
seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/LocalFileCheckPointTest.java
index d054d1611..14c6a7aea 100644
--- 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsConstants.java
+++ 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/LocalFileCheckPointTest.java
@@ -20,11 +20,23 @@
 
 package org.apache.seatunnel.engine.checkpoint.storage.hdfs;
 
-public class HdfsConstants {
+import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
 
-    public static final String HDFS_DEF_FS_NAME = "fs.defaultFS";
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.condition.EnabledOnOs;
+import org.junit.jupiter.api.condition.OS;
 
-    public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
+import java.util.HashMap;
+
+@EnabledOnOs({OS.LINUX, OS.MAC})
+public class LocalFileCheckPointTest extends AbstractFileCheckPointTest {
+
+    @BeforeAll
+    public static void setup() throws CheckpointStorageException {
+        HashMap config = new HashMap();
+        config.put("storageNameSpace", "/tmp/");
+        STORAGE = new HdfsStorage(config);
+        initStorageData();
+    }
 
-    public static final String KERBEROS_KEYTAB_FILE_PATH = 
"kerberosKeytabFilePath";
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/S3FileCheckpointTest.java
similarity index 55%
copy from 
seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
copy to 
seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/S3FileCheckpointTest.java
index 791811274..c57894e2a 100644
--- 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
+++ 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/S3FileCheckpointTest.java
@@ -20,30 +20,27 @@
 
 package org.apache.seatunnel.engine.checkpoint.storage.hdfs;
 
-import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
-import 
org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
 import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
 
-import com.google.auto.service.AutoService;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
 
+import java.util.HashMap;
 import java.util.Map;
 
-@AutoService(Factory.class)
-public class HdfsStorageFactory implements CheckpointStorageFactory {
-    @Override
-    public String factoryIdentifier() {
-        return "hdfs";
-    }
+@Disabled("S3 is not available in CI, if you want to run this test, please set 
up your own S3 environment")
+public class S3FileCheckpointTest extends AbstractFileCheckPointTest {
 
-    @Override
-    public OptionRule optionRule() {
-        return OptionRule.builder().build();
+    @BeforeAll
+    public static void setup() throws CheckpointStorageException {
+        Map<String, String> config = new HashMap<>();
+        config.put("storage.type", "s3");
+        config.put("access.key", "your access key");
+        config.put("secret.key", "your secret key");
+        config.put("s3.bucket", "s3a://calvin.test.cn");
+        config.put("fs.s3a.aws.credentials.provider", 
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
+        STORAGE = new HdfsStorage(config);
+        initStorageData();
     }
 
-    @Override
-    public CheckpointStorage create(Map<String, String> configuration) throws 
CheckpointStorageException {
-        return new HdfsStorage(configuration);
-    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java
 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java
index 76c7b6e9d..090ce61ce 100644
--- 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java
+++ 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java
@@ -32,7 +32,10 @@ import java.util.Map;
 /**
  * Local file storage plug-in, use local file storage,
  * only suitable for single-machine testing or small data scale use, use with 
caution in production environment
+ * <p>
+ * deprecated: use @see 
org.apache.seatunnel.engine.checkpoint.storage.hdfs.HdfsStorageFactory instead
  */
+@Deprecated
 @AutoService(Factory.class)
 public class LocalFileStorageFactory implements CheckpointStorageFactory {
 


Reply via email to