This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8f6ccfb3b [Engine][Checkpoint-Storage]Support S3 protocol (#3675)
8f6ccfb3b is described below
commit 8f6ccfb3b97cb92e37ce61931b0510df4f9da8a6
Author: Kirs <[email protected]>
AuthorDate: Thu Dec 8 17:26:28 2022 +0800
[Engine][Checkpoint-Storage]Support S3 protocol (#3675)
Support S3 protocol and Local file in hdfs plugin
Fix Hdfs storage may have file rename failure but return success
---
.../storage/api/AbstractCheckpointStorage.java | 2 +-
.../checkpoint-storage-hdfs/pom.xml | 18 ++---
.../checkpoint/storage/hdfs/HdfsStorage.java | 63 ++++++---------
.../storage/hdfs/HdfsStorageFactory.java | 19 +++++
.../storage/hdfs/common/AbstractConfiguration.java | 64 +++++++++++++++
.../FileConfiguration.java} | 31 ++++++--
.../storage/hdfs/common/HdfsConfiguration.java | 91 ++++++++++++++++++++++
.../storage/hdfs/common/LocalConfiguration.java | 42 ++++++++++
.../storage/hdfs/common/S3Configuration.java | 84 ++++++++++++++++++++
.../storage/hdfs/AbstractFileCheckPointTest.java | 84 ++++++++++++++++++++
.../storage/hdfs/LocalFileCheckPointTest.java} | 20 ++++-
.../storage/hdfs/S3FileCheckpointTest.java} | 33 ++++----
.../storage/localfile/LocalFileStorageFactory.java | 3 +
13 files changed, 477 insertions(+), 77 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
index eb3593504..ea3925f0c 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
@@ -95,7 +95,7 @@ public abstract class AbstractCheckpointStorage implements
CheckpointStorage {
}
public String getCheckPointName(PipelineState state) {
- return System.nanoTime() + FILE_NAME_SPLIT +
ThreadLocalRandom.current().nextInt(FILE_NAME_RANDOM_RANGE) + FILE_NAME_SPLIT +
state.getPipelineId() + FILE_NAME_SPLIT + state.getCheckpointId() + "." +
FILE_FORMAT;
+ return System.currentTimeMillis() + FILE_NAME_SPLIT +
ThreadLocalRandom.current().nextInt(FILE_NAME_RANDOM_RANGE) + FILE_NAME_SPLIT +
state.getPipelineId() + FILE_NAME_SPLIT + state.getCheckpointId() + "." +
FILE_FORMAT;
}
public byte[] serializeCheckPointData(PipelineState state) throws
IOException {
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/pom.xml
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/pom.xml
index d02219441..d1e3f584b 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/pom.xml
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/pom.xml
@@ -31,16 +31,16 @@
<artifactId>checkpoint-storage-hdfs</artifactId>
<dependencies>
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-shaded-hadoop-2</artifactId>
- <version>${flink-shaded-hadoop-2.version}</version>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <version>3.0.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>3.0.0</version>
<scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
index 598e5ef8e..cb76a8198 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
@@ -25,6 +25,8 @@ import static
org.apache.seatunnel.engine.checkpoint.storage.constants.StorageCo
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import
org.apache.seatunnel.engine.checkpoint.storage.api.AbstractCheckpointStorage;
import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+import
org.apache.seatunnel.engine.checkpoint.storage.hdfs.common.AbstractConfiguration;
+import
org.apache.seatunnel.engine.checkpoint.storage.hdfs.common.FileConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -35,8 +37,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.util.ArrayList;
@@ -48,11 +48,8 @@ import java.util.Set;
public class HdfsStorage extends AbstractCheckpointStorage {
public FileSystem fs;
- private static final String HADOOP_SECURITY_AUTHENTICATION_KEY =
"hadoop.security.authentication";
-
- private static final String KERBEROS_KEY = "kerberos";
-
private static final String STORAGE_TMP_SUFFIX = "tmp";
+ private static final String STORAGE_TYPE_KEY = "storage.type";
public HdfsStorage(Map<String, String> configuration) throws
CheckpointStorageException {
this.initStorage(configuration);
@@ -60,31 +57,26 @@ public class HdfsStorage extends AbstractCheckpointStorage {
@Override
public void initStorage(Map<String, String> configuration) throws
CheckpointStorageException {
- Configuration hadoopConf = new Configuration();
- if (configuration.containsKey(HdfsConstants.HDFS_DEF_FS_NAME)) {
- hadoopConf.set(HdfsConstants.HDFS_DEF_FS_NAME,
configuration.get(HdfsConstants.HDFS_DEF_FS_NAME));
- }
if (StringUtils.isNotBlank(configuration.get(STORAGE_NAME_SPACE))) {
setStorageNameSpace(configuration.get(STORAGE_NAME_SPACE));
+ configuration.remove(STORAGE_NAME_SPACE);
}
- // todo support other config configurations
- if (configuration.containsKey(HdfsConstants.KERBEROS_PRINCIPAL) &&
configuration.containsKey(HdfsConstants.KERBEROS_KEYTAB_FILE_PATH)) {
- String kerberosPrincipal =
configuration.get(HdfsConstants.KERBEROS_PRINCIPAL);
- String kerberosKeytabFilePath =
configuration.get(HdfsConstants.KERBEROS_KEYTAB_FILE_PATH);
- if (StringUtils.isNotBlank(kerberosPrincipal) &&
StringUtils.isNotBlank(kerberosKeytabFilePath)) {
- hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY,
KERBEROS_KEY);
- authenticateKerberos(kerberosPrincipal,
kerberosKeytabFilePath, hadoopConf);
- }
- }
- JobConf jobConf = new JobConf(hadoopConf);
+ Configuration hadoopConf = getConfiguration(configuration);
try {
- fs = FileSystem.get(jobConf);
+ fs = FileSystem.get(hadoopConf);
} catch (IOException e) {
throw new CheckpointStorageException("Failed to get file system",
e);
}
}
+ private Configuration getConfiguration(Map<String, String> config) throws
CheckpointStorageException {
+ String storageType = config.getOrDefault(STORAGE_TYPE_KEY,
FileConfiguration.LOCAL.toString());
+ config.remove(STORAGE_TYPE_KEY);
+ AbstractConfiguration configuration =
FileConfiguration.valueOf(storageType.toUpperCase()).getConfiguration(storageType);
+ return configuration.buildConfiguration(config);
+ }
+
@Override
public String storeCheckPoint(PipelineState state) throws
CheckpointStorageException {
byte[] datas;
@@ -98,10 +90,17 @@ public class HdfsStorage extends AbstractCheckpointStorage {
Path tmpFilePath = new Path(getStorageParentDirectory() +
state.getJobId() + "/" + getCheckPointName(state) + STORAGE_TMP_SUFFIX);
try (FSDataOutputStream out = fs.create(tmpFilePath)) {
out.write(datas);
- out.hsync();
- fs.rename(tmpFilePath, filePath);
} catch (IOException e) {
throw new CheckpointStorageException("Failed to write checkpoint
data, state: " + state, e);
+ }
+ try {
+ boolean success = fs.rename(tmpFilePath, filePath);
+ if (!success) {
+ throw new CheckpointStorageException("Failed to rename tmp
file to final file");
+ }
+
+ } catch (IOException e) {
+ throw new CheckpointStorageException("Failed to rename tmp file to
final file");
} finally {
try {
// clean up tmp file, if still lying around
@@ -112,6 +111,7 @@ public class HdfsStorage extends AbstractCheckpointStorage {
log.error("Failed to delete tmp file", ioe);
}
}
+
return filePath.getName();
}
@@ -245,23 +245,6 @@ public class HdfsStorage extends AbstractCheckpointStorage
{
});
}
- /**
- * Authenticate kerberos
- *
- * @param kerberosPrincipal kerberos principal
- * @param kerberosKeytabFilePath kerberos keytab file path
- * @param hdfsConf hdfs configuration
- * @throws CheckpointStorageException authentication exception
- */
- private void authenticateKerberos(String kerberosPrincipal, String
kerberosKeytabFilePath, Configuration hdfsConf) throws
CheckpointStorageException {
- UserGroupInformation.setConfiguration(hdfsConf);
- try {
- UserGroupInformation.loginUserFromKeytab(kerberosPrincipal,
kerberosKeytabFilePath);
- } catch (IOException e) {
- throw new CheckpointStorageException("Failed to login user from
keytab : " + kerberosKeytabFilePath + " and kerberos principal : " +
kerberosPrincipal, e);
- }
- }
-
private List<String> getFileNames(String path) throws
CheckpointStorageException {
try {
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
index 791811274..d528dc9f5 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
@@ -30,6 +30,25 @@ import com.google.auto.service.AutoService;
import java.util.Map;
+/**
+ * HdfsCheckpointStorageFactory.
+ * if you want to use HdfsCheckpointStorage, you should add the following
configuration in the configuration file:
+ * <pre>
+ * storage.type = hdfs # hdfs, local(default),s3
+ * </pre>
+ * then you need to configure the following parameters by the storage.type:
+ * hdfs {@link
org.apache.seatunnel.engine.checkpoint.storage.hdfs.common.HdfsConfiguration}
+ * local {@link
org.apache.seatunnel.engine.checkpoint.storage.hdfs.common.LocalConfiguration}
+ * s3 {@link
org.apache.seatunnel.engine.checkpoint.storage.hdfs.common.S3Configuration}
+ * eg: s3
+ * <pre>
+ * storage.type = "s3"
+ * s3.assess.key = "your access key"
+ * s3.script.key = "your script key"
+ * s3.bucket= "s3a://your bucket"
+ * fs.s3a.aws.credentials.provider =
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
+ * </pre>
+ */
@AutoService(Factory.class)
public class HdfsStorageFactory implements CheckpointStorageFactory {
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/AbstractConfiguration.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/AbstractConfiguration.java
new file mode 100644
index 000000000..4a28eb80b
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/AbstractConfiguration.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs.common;
+
+import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+public abstract class AbstractConfiguration {
+
+ protected static final String HDFS_IMPL_KEY = "impl";
+
+ /**
+ * check the configuration keys
+ *
+ * @param config configuration
+ * @param keys keys
+ */
+ void checkConfiguration(Map<String, String> config, String... keys) {
+ for (String key : keys) {
+ if (!config.containsKey(key) || null == config.get(key)) {
+ throw new IllegalArgumentException(key + " is required");
+ }
+ }
+ }
+
+ public abstract Configuration buildConfiguration(Map<String, String>
config) throws CheckpointStorageException;
+
+ /**
+ * set extra options for configuration
+ *
+ * @param hadoopConf hadoop configuration
+ * @param config extra options
+ * @param prefix prefix of extra options
+ */
+ void setExtraConfiguration(Configuration hadoopConf, Map<String, String>
config, String prefix) {
+ config.forEach((k, v) -> {
+ if (k.startsWith(prefix)) {
+ hadoopConf.set(k, v);
+ }
+ });
+ }
+
+}
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsConstants.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/FileConfiguration.java
similarity index 53%
copy from
seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsConstants.java
copy to
seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/FileConfiguration.java
index d054d1611..b85acdfef 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsConstants.java
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/FileConfiguration.java
@@ -18,13 +18,34 @@
*
*/
-package org.apache.seatunnel.engine.checkpoint.storage.hdfs;
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs.common;
-public class HdfsConstants {
+public enum FileConfiguration {
+ LOCAL("local", new LocalConfiguration()),
+ HDFS("hdfs", new HdfsConfiguration()),
+ S3("s3", new S3Configuration());
- public static final String HDFS_DEF_FS_NAME = "fs.defaultFS";
+ /**
+ * file system type
+ */
+ private String name;
- public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
+ /**
+ * file system configuration
+ */
+ private AbstractConfiguration configuration;
+
+ FileConfiguration(String name, AbstractConfiguration configuration) {
+ this.name = name;
+ this.configuration = configuration;
+ }
+
+ public AbstractConfiguration getConfiguration(String name) {
+ return configuration;
+ }
+
+ public String getName() {
+ return name;
+ }
- public static final String KERBEROS_KEYTAB_FILE_PATH =
"kerberosKeytabFilePath";
}
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
new file mode 100644
index 000000000..dc912c29e
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs.common;
+
+import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
+
+import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class HdfsConfiguration extends AbstractConfiguration {
+
+ /**
+ * hdfs uri is required
+ */
+ private static final String HDFS_DEF_FS_NAME = "fs.defaultFS";
+ /**
+ * hdfs kerberos principal( is optional)
+ */
+ private static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
+ private static final String KERBEROS_KEYTAB_FILE_PATH =
"kerberosKeytabFilePath";
+ private static final String HADOOP_SECURITY_AUTHENTICATION_KEY =
"hadoop.security.authentication";
+
+ private static final String KERBEROS_KEY = "kerberos";
+
+ /*********** Hdfs constants **************/
+ private static final String HDFS_IMPL =
"org.apache.hadoop.hdfs.DistributedFileSystem";
+ private static final String HDFS_IMPL_KEY = "fs.hdfs.impl";
+
+ @Override
+ public Configuration buildConfiguration(Map<String, String> config) throws
CheckpointStorageException {
+ checkConfiguration(config, HDFS_DEF_FS_NAME);
+ Configuration hadoopConf = new Configuration();
+ if (config.containsKey(HDFS_DEF_FS_NAME)) {
+ hadoopConf.set(HDFS_DEF_FS_NAME, config.get(HDFS_DEF_FS_NAME));
+ }
+ hadoopConf.set(HDFS_IMPL_KEY, HDFS_IMPL);
+ hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(FS_DEFAULT_NAME_KEY));
+ if (config.containsKey(KERBEROS_PRINCIPAL) &&
config.containsKey(KERBEROS_KEYTAB_FILE_PATH)) {
+ String kerberosPrincipal = config.get(KERBEROS_PRINCIPAL);
+ String kerberosKeytabFilePath =
config.get(KERBEROS_KEYTAB_FILE_PATH);
+ if (StringUtils.isNotBlank(kerberosPrincipal) &&
StringUtils.isNotBlank(kerberosKeytabFilePath)) {
+ hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY,
KERBEROS_KEY);
+ authenticateKerberos(kerberosPrincipal,
kerberosKeytabFilePath, hadoopConf);
+ }
+ }
+ //todo support other hdfs optional config keys
+ return hadoopConf;
+ }
+
+ /**
+ * Authenticate kerberos
+ *
+ * @param kerberosPrincipal kerberos principal
+ * @param kerberosKeytabFilePath kerberos keytab file path
+ * @param hdfsConf hdfs configuration
+ * @throws CheckpointStorageException authentication exception
+ */
+ private void authenticateKerberos(String kerberosPrincipal, String
kerberosKeytabFilePath, Configuration hdfsConf) throws
CheckpointStorageException {
+ UserGroupInformation.setConfiguration(hdfsConf);
+ try {
+ UserGroupInformation.loginUserFromKeytab(kerberosPrincipal,
kerberosKeytabFilePath);
+ } catch (IOException e) {
+ throw new CheckpointStorageException("Failed to login user from
keytab : " + kerberosKeytabFilePath + " and kerberos principal : " +
kerberosPrincipal, e);
+ }
+ }
+
+}
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/LocalConfiguration.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/LocalConfiguration.java
new file mode 100644
index 000000000..998f0b1f0
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/LocalConfiguration.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs.common;
+
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
+import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+public class LocalConfiguration extends AbstractConfiguration {
+
+ private static final String HDFS_LOCAL_IMPL =
"org.apache.hadoop.fs.LocalFileSystem";
+ private static final String HDFS_LOCAL_IMPL_KEY = "fs.file.impl";
+
+ @Override
+ public Configuration buildConfiguration(Map<String, String> config) {
+ Configuration hadoopConf = new Configuration();
+ hadoopConf.set(HDFS_LOCAL_IMPL_KEY, HDFS_LOCAL_IMPL);
+ hadoopConf.set(FS_DEFAULT_NAME_KEY,
config.getOrDefault(FS_DEFAULT_NAME_KEY, FS_DEFAULT_NAME_DEFAULT));
+ return hadoopConf;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/S3Configuration.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/S3Configuration.java
new file mode 100644
index 000000000..868a0d193
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/S3Configuration.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs.common;
+
+import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+/**
+ * S3Configuration
+ * we just support s3n and s3a protocol.
+ * some hadoop low version not support s3a, if you want to use s3a, you should
check your hadoop version first.
+ * <p>
+ * access, secret and bucket is required, and the default schema is s3n
+ * we used the bucket name to get the protocol,if you used s3a, this bucket
name must be s3a://bucket, if you used s3n, this bucket name must be
s3n://bucket
+ * <p>
+ * other configuration is optional, if you need to set other configuration,
you can set it in the config
+ * and the parameter name is the same as the hadoop configuration.
+ * <p>
+ * eg: if you want to set the endpoint, you can set it in the config like
this: config.put("fs.s3a.endpoint", "http://),
+ * the prefix is fs.s3a and must be the same as the hadoop configuration
+ * <p>
+ * more information about the configuration, please refer to the official
website:
+ *
https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html
+ */
+public class S3Configuration extends AbstractConfiguration {
+
+ /**************** S3 required keys ***************/
+ public static final String S3_ACCESS_KEY = "access.key";
+ public static final String S3_SECRET_KEY = "secret.key";
+ public static final String S3_BUCKET_KEY = "s3.bucket";
+
+
+ /* S3 constants */
+ private static final String HDFS_S3N_IMPL =
"org.apache.hadoop.fs.s3native.NativeS3FileSystem";
+ private static final String HDFS_S3A_IMPL =
"org.apache.hadoop.fs.s3a.S3AFileSystem";
+ private static final String S3A_PROTOCOL = "s3a";
+ private static final String DEFAULT_PROTOCOL = "s3n";
+ private static final String S3_FORMAT_KEY = "fs.%s.%s";
+ private static final String SPLIT_CHAR = ".";
+ private static final String FS_KEY = "fs.";
+
+ @Override
+ public Configuration buildConfiguration(Map<String, String> config) {
+ checkConfiguration(config, S3_ACCESS_KEY, S3_SECRET_KEY,
S3_BUCKET_KEY);
+ String protocol = DEFAULT_PROTOCOL;
+ if (config.get(S3_BUCKET_KEY).startsWith(S3A_PROTOCOL)) {
+ protocol = S3A_PROTOCOL;
+ }
+ String fsImpl = protocol.equals(S3A_PROTOCOL) ? HDFS_S3A_IMPL :
HDFS_S3N_IMPL;
+ Configuration hadoopConf = new Configuration();
+ hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(S3_BUCKET_KEY));
+ hadoopConf.set(formatKey(protocol, S3_ACCESS_KEY),
config.get(S3_ACCESS_KEY));
+ hadoopConf.set(formatKey(protocol, S3_SECRET_KEY),
config.get(S3_SECRET_KEY));
+ hadoopConf.set(formatKey(protocol, HDFS_IMPL_KEY), fsImpl);
+ setExtraConfiguration(hadoopConf, config, FS_KEY + protocol +
SPLIT_CHAR);
+ return hadoopConf;
+ }
+
+ private String formatKey(String protocol, String key) {
+ return String.format(S3_FORMAT_KEY, protocol, key);
+ }
+
+}
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/AbstractFileCheckPointTest.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/AbstractFileCheckPointTest.java
new file mode 100644
index 000000000..ebadf614a
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/AbstractFileCheckPointTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs;
+
+import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
+import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+public abstract class AbstractFileCheckPointTest {
+
+ protected static HdfsStorage STORAGE;
+ protected static final String JOB_ID = "chris";
+ @Test
+ public void testGetAllCheckpoints() throws CheckpointStorageException {
+
+ List<PipelineState> pipelineStates = STORAGE.getAllCheckpoints(JOB_ID);
+ Assertions.assertEquals(3, pipelineStates.size());
+ }
+
+ @Test
+ public void testGetLatestCheckpoints() throws CheckpointStorageException {
+ List<PipelineState> pipelineStates =
STORAGE.getLatestCheckpoint(JOB_ID);
+ Assertions.assertEquals(2, pipelineStates.size());
+ }
+
+ @Test
+ public void testGetLatestCheckpointByJobIdAndPipelineId() throws
CheckpointStorageException {
+ PipelineState state =
STORAGE.getLatestCheckpointByJobIdAndPipelineId(JOB_ID, "1");
+ Assertions.assertEquals(2, state.getCheckpointId());
+ }
+
+ @Test
+ public void testGetCheckpointsByJobIdAndPipelineId() throws
CheckpointStorageException {
+ List<PipelineState> state =
STORAGE.getCheckpointsByJobIdAndPipelineId(JOB_ID, "1");
+ Assertions.assertEquals(2, state.size());
+ }
+
+ @AfterAll
+ public static void teardown() {
+ STORAGE.deleteCheckpoint(JOB_ID);
+ }
+
+ /**
+ * init storage data
+ * @throws CheckpointStorageException exception if init failed
+ */
+ protected static void initStorageData() throws CheckpointStorageException {
+ PipelineState pipelineState = PipelineState.builder()
+ .jobId(JOB_ID)
+ .pipelineId(1)
+ .checkpointId(1)
+ .states(new byte[0])
+ .build();
+ STORAGE.storeCheckPoint(pipelineState);
+ pipelineState.setCheckpointId(2);
+ STORAGE.storeCheckPoint(pipelineState);
+ pipelineState.setPipelineId(2);
+ pipelineState.setCheckpointId(3);
+ STORAGE.storeCheckPoint(pipelineState);
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsConstants.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/LocalFileCheckPointTest.java
similarity index 59%
rename from
seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsConstants.java
rename to
seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/LocalFileCheckPointTest.java
index d054d1611..14c6a7aea 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsConstants.java
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/LocalFileCheckPointTest.java
@@ -20,11 +20,23 @@
package org.apache.seatunnel.engine.checkpoint.storage.hdfs;
-public class HdfsConstants {
+import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
- public static final String HDFS_DEF_FS_NAME = "fs.defaultFS";
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.condition.EnabledOnOs;
+import org.junit.jupiter.api.condition.OS;
- public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
+import java.util.HashMap;
+
+@EnabledOnOs({OS.LINUX, OS.MAC})
+public class LocalFileCheckPointTest extends AbstractFileCheckPointTest {
+
+ @BeforeAll
+ public static void setup() throws CheckpointStorageException {
+ HashMap config = new HashMap();
+ config.put("storageNameSpace", "/tmp/");
+ STORAGE = new HdfsStorage(config);
+ initStorageData();
+ }
- public static final String KERBEROS_KEYTAB_FILE_PATH =
"kerberosKeytabFilePath";
}
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/S3FileCheckpointTest.java
similarity index 55%
copy from
seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
copy to
seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/S3FileCheckpointTest.java
index 791811274..c57894e2a 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/S3FileCheckpointTest.java
@@ -20,30 +20,27 @@
package org.apache.seatunnel.engine.checkpoint.storage.hdfs;
-import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
-import
org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
-import com.google.auto.service.AutoService;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import java.util.HashMap;
import java.util.Map;
-@AutoService(Factory.class)
-public class HdfsStorageFactory implements CheckpointStorageFactory {
- @Override
- public String factoryIdentifier() {
- return "hdfs";
- }
+@Disabled("S3 is not available in CI, if you want to run this test, please set
up your own S3 environment")
+public class S3FileCheckpointTest extends AbstractFileCheckPointTest {
- @Override
- public OptionRule optionRule() {
- return OptionRule.builder().build();
+ @BeforeAll
+ public static void setup() throws CheckpointStorageException {
+ Map<String, String> config = new HashMap<>();
+ config.put("storage.type", "s3");
+ config.put("access.key", "your access key");
+ config.put("secret.key", "your secret key");
+ config.put("s3.bucket", "s3a://calvin.test.cn");
+ config.put("fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
+ STORAGE = new HdfsStorage(config);
+ initStorageData();
}
- @Override
- public CheckpointStorage create(Map<String, String> configuration) throws
CheckpointStorageException {
- return new HdfsStorage(configuration);
- }
}
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java
index 76c7b6e9d..090ce61ce 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java
@@ -32,7 +32,10 @@ import java.util.Map;
/**
* Local file storage plug-in, use local file storage,
* only suitable for single-machine testing or small data scale use, use with
caution in production environment
+ * <p>
+ * deprecated: use @see
org.apache.seatunnel.engine.checkpoint.storage.hdfs.HdfsStorageFactory instead
*/
+@Deprecated
@AutoService(Factory.class)
public class LocalFileStorageFactory implements CheckpointStorageFactory {