This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 53c8957d51 [feature][CheckPoint-stroage]:Added Disable cache
configuration. (#6718)
53c8957d51 is described below
commit 53c8957d51de566967095bfbfd22dc8f60a2afc3
Author: Leon Yoah <[email protected]>
AuthorDate: Mon Apr 29 11:01:33 2024 +0800
[feature][CheckPoint-stroage]:Added Disable cache configuration. (#6718)
---
docs/en/seatunnel-engine/checkpoint-storage.md | 37 ++++++++++++++++++++++
docs/zh/seatunnel-engine/checkpoint-storage.md | 36 +++++++++++++++++++++
.../storage/hdfs/common/AbstractConfiguration.java | 5 +++
.../storage/hdfs/common/HdfsConfiguration.java | 4 +++
.../storage/hdfs/common/LocalConfiguration.java | 5 +++
.../storage/hdfs/common/OssConfiguration.java | 4 +++
.../storage/hdfs/common/S3Configuration.java | 4 +++
.../storage/hdfs/HDFSFileCheckpointTest.java | 2 +-
.../storage/hdfs/LocalFileCheckPointTest.java | 1 +
.../storage/hdfs/OssFileCheckpointTest.java | 1 +
.../storage/hdfs/S3FileCheckpointTest.java | 1 +
11 files changed, 99 insertions(+), 1 deletion(-)
diff --git a/docs/en/seatunnel-engine/checkpoint-storage.md
b/docs/en/seatunnel-engine/checkpoint-storage.md
index 1e9a9332a5..13e1721371 100644
--- a/docs/en/seatunnel-engine/checkpoint-storage.md
+++ b/docs/en/seatunnel-engine/checkpoint-storage.md
@@ -208,3 +208,40 @@ seatunnel:
```
+### Enable cache
+
+When storage:type is hdfs, cache is disabled by default. If you want to enable
it, set `disable.cache: false`
+
+```yaml
+seatunnel:
+ engine:
+ checkpoint:
+ interval: 6000
+ timeout: 7000
+ storage:
+ type: hdfs
+ max-retained: 3
+ plugin-config:
+ storage.type: hdfs
+ disable.cache: false
+ fs.defaultFS: hdfs:///
+
+```
+
+or
+
+```yaml
+seatunnel:
+ engine:
+ checkpoint:
+ interval: 6000
+ timeout: 7000
+ storage:
+ type: hdfs
+ max-retained: 3
+ plugin-config:
+ storage.type: hdfs
+ disable.cache: false
+ fs.defaultFS: file:///
+```
+
diff --git a/docs/zh/seatunnel-engine/checkpoint-storage.md
b/docs/zh/seatunnel-engine/checkpoint-storage.md
index 2ce3be53ae..a54492689e 100644
--- a/docs/zh/seatunnel-engine/checkpoint-storage.md
+++ b/docs/zh/seatunnel-engine/checkpoint-storage.md
@@ -184,3 +184,39 @@ seatunnel:
```
+### 开启高速缓存
+
+当storage:type为hdfs时,默认关闭cache。如果您想启用它,请设置为`disable.cache: false`。
+
+```yaml
+seatunnel:
+ engine:
+ checkpoint:
+ interval: 6000
+ timeout: 7000
+ storage:
+ type: hdfs
+ max-retained: 3
+ plugin-config:
+ storage.type: hdfs
+ disable.cache: false
+ fs.defaultFS: hdfs:/// # Ensure that the directory has written
permission
+```
+
+or
+
+```yaml
+seatunnel:
+ engine:
+ checkpoint:
+ interval: 6000
+ timeout: 7000
+ storage:
+ type: hdfs
+ max-retained: 3
+ plugin-config:
+ storage.type: hdfs
+ disable.cache: false
+ fs.defaultFS: file:///
+```
+
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/AbstractConfiguration.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/AbstractConfiguration.java
index a6a20370a5..4a76c7eafe 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/AbstractConfiguration.java
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/AbstractConfiguration.java
@@ -30,6 +30,11 @@ public abstract class AbstractConfiguration {
protected static final String HDFS_IMPL_KEY = "impl";
+ protected static final String COMMON_DISABLE_CACHE = "%s.disable.cache";
+
+ protected static final String DISABLE_CACHE_DEFAULT_VALUE = "TRUE";
+
+ protected static final String DISABLE_CACHE_KEY = "disable.cache";
/**
* check the configuration keys
*
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
index 2da4c6ad5f..0ffb95b889 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
@@ -77,6 +77,10 @@ public class HdfsConfiguration extends AbstractConfiguration
{
if (config.containsKey(HDFS_SITE_PATH)) {
hadoopConf.addResource(new Path(config.get(HDFS_SITE_PATH)));
}
+ hadoopConf.setBoolean(
+ String.format(COMMON_DISABLE_CACHE, HDFS_IMPL_KEY),
+ Boolean.parseBoolean(
+ config.getOrDefault(DISABLE_CACHE_KEY,
DISABLE_CACHE_DEFAULT_VALUE)));
// support other hdfs optional config keys
config.entrySet().stream()
.filter(entry ->
entry.getKey().startsWith(SEATUNNEL_HADOOP_PREFIX))
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/LocalConfiguration.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/LocalConfiguration.java
index b03e91b99d..af11a592ae 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/LocalConfiguration.java
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/LocalConfiguration.java
@@ -39,6 +39,11 @@ public class LocalConfiguration extends
AbstractConfiguration {
hadoopConf.set(
FS_DEFAULT_NAME_KEY,
config.getOrDefault(FS_DEFAULT_NAME_KEY,
FS_DEFAULT_NAME_DEFAULT));
+ hadoopConf.setBoolean(
+ String.format(COMMON_DISABLE_CACHE, HDFS_LOCAL_IMPL_KEY),
+ Boolean.parseBoolean(
+ config.getOrDefault(DISABLE_CACHE_KEY,
DISABLE_CACHE_DEFAULT_VALUE)));
+
return hadoopConf;
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/OssConfiguration.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/OssConfiguration.java
index 08aef21ebe..b87ab27023 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/OssConfiguration.java
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/OssConfiguration.java
@@ -43,6 +43,10 @@ public class OssConfiguration extends AbstractConfiguration {
Configuration hadoopConf = new Configuration();
hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(OSS_BUCKET_KEY));
hadoopConf.set(OSS_IMPL_KEY, HDFS_OSS_IMPL);
+ hadoopConf.setBoolean(
+ String.format(COMMON_DISABLE_CACHE, OSS_IMPL_KEY),
+ Boolean.parseBoolean(
+ config.getOrDefault(DISABLE_CACHE_KEY,
DISABLE_CACHE_DEFAULT_VALUE)));
setExtraConfiguration(hadoopConf, config, OSS_KEY);
return hadoopConf;
}
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/S3Configuration.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/S3Configuration.java
index ea3ee0fdde..4041af7f23 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/S3Configuration.java
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/S3Configuration.java
@@ -69,6 +69,10 @@ public class S3Configuration extends AbstractConfiguration {
Configuration hadoopConf = new Configuration();
hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(S3_BUCKET_KEY));
hadoopConf.set(formatKey(protocol, HDFS_IMPL_KEY), fsImpl);
+ hadoopConf.setBoolean(
+ String.format(COMMON_DISABLE_CACHE, formatKey(protocol,
HDFS_IMPL_KEY)),
+ Boolean.parseBoolean(
+ config.getOrDefault(DISABLE_CACHE_KEY,
DISABLE_CACHE_DEFAULT_VALUE)));
setExtraConfiguration(hadoopConf, config, FS_KEY + protocol +
SPLIT_CHAR);
return hadoopConf;
}
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HDFSFileCheckpointTest.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HDFSFileCheckpointTest.java
index 23a41a2782..a085acbded 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HDFSFileCheckpointTest.java
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HDFSFileCheckpointTest.java
@@ -36,7 +36,7 @@ public class HDFSFileCheckpointTest extends
AbstractFileCheckPointTest {
public static void setup() throws CheckpointStorageException {
Map<String, String> config = new HashMap<>();
config.put("storage.type", "hdfs");
- config.put("fs.defaultFS", "hdfs://usdp-bing");
+ config.put("disable.cache", "false");
config.put("seatunnel.hadoop.dfs.nameservices", "usdp-bing");
config.put("seatunnel.hadoop.dfs.ha.namenodes.usdp-bing", "nn1,nn2");
config.put("seatunnel.hadoop.dfs.namenode.rpc-address.usdp-bing.nn1",
"usdp-bing-nn1:8020");
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/LocalFileCheckPointTest.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/LocalFileCheckPointTest.java
index 94d058cc54..f441364a2e 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/LocalFileCheckPointTest.java
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/LocalFileCheckPointTest.java
@@ -35,6 +35,7 @@ public class LocalFileCheckPointTest extends
AbstractFileCheckPointTest {
public static void setup() throws CheckpointStorageException {
HashMap config = new HashMap();
config.put("namespace", "/tmp/");
+ config.put("disable.cache", "false");
STORAGE = new HdfsStorage(config);
initStorageData();
}
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/OssFileCheckpointTest.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/OssFileCheckpointTest.java
index e9de3c0e1b..3d7299c266 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/OssFileCheckpointTest.java
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/OssFileCheckpointTest.java
@@ -35,6 +35,7 @@ public class OssFileCheckpointTest extends
AbstractFileCheckPointTest {
public static void setup() throws CheckpointStorageException {
Map<String, String> config = new HashMap<>();
config.put("storage.type", "oss");
+ config.put("disable.cache", "false");
config.put("fs.oss.accessKeyId", "your access key id");
config.put("fs.oss.accessKeySecret", "your access key secret");
config.put("fs.oss.endpoint", "oss-cn-hangzhou.aliyuncs.com");
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/S3FileCheckpointTest.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/S3FileCheckpointTest.java
index c9657a5468..fb7b2a1c21 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/S3FileCheckpointTest.java
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/S3FileCheckpointTest.java
@@ -36,6 +36,7 @@ public class S3FileCheckpointTest extends
AbstractFileCheckPointTest {
public static void setup() throws CheckpointStorageException {
Map<String, String> config = new HashMap<>();
config.put("storage.type", "s3");
+ config.put("disable.cache", "false");
config.put("fs.s3a.access.key", "your access key");
config.put("fs.s3a.secret.key", "your secret key");
config.put("s3.bucket", "s3a://calvin.test.cn");