This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f1880394ab [Feature][Zeta] Checkpoint support hdfs ha mode (#4942)
f1880394ab is described below
commit f1880394abb6015b85bc74692051b3ed13eb8f79
Author: Koyfin <[email protected]>
AuthorDate: Mon Aug 14 11:11:19 2023 +0800
[Feature][Zeta] Checkpoint support hdfs ha mode (#4942)
---
docs/en/seatunnel-engine/checkpoint-storage.md | 22 ++++++++++
.../storage/hdfs/common/HdfsConfiguration.java | 12 +++++-
.../storage/hdfs/HDFSFileCheckpointTest.java | 50 ++++++++++++++++++++++
3 files changed, 83 insertions(+), 1 deletion(-)
diff --git a/docs/en/seatunnel-engine/checkpoint-storage.md
b/docs/en/seatunnel-engine/checkpoint-storage.md
index afe1fa6bc1..f2a6487f28 100644
--- a/docs/en/seatunnel-engine/checkpoint-storage.md
+++ b/docs/en/seatunnel-engine/checkpoint-storage.md
@@ -146,6 +146,28 @@ seatunnel:
kerberosKeytab: your-kerberos-keytab
```
+if HDFS is in HA mode , you can config like this:
+
+```yaml
+seatunnel:
+ engine:
+ checkpoint:
+ storage:
+ type: hdfs
+ max-retained: 3
+ plugin-config:
+ storage.type: hdfs
+ fs.defaultFS: hdfs://usdp-bing
+ seatunnel.hadoop.dfs.nameservices: usdp-bing
+ seatunnel.hadoop.dfs.ha.namenodes.usdp-bing: nn1,nn2
+ seatunnel.hadoop.dfs.namenode.rpc-address.usdp-bing.nn1:
usdp-bing-nn1:8020
+ seatunnel.hadoop.dfs.namenode.rpc-address.usdp-bing.nn2:
usdp-bing-nn2:8020
+ seatunnel.hadoop.dfs.client.failover.proxy.provider.usdp-bing:
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+
+```
+
+if HDFS has some other configs in `hdfs-site.xml` or `core-site.xml` , just
set HDFS config by using `seatunnel.hadoop.` prefix.
+
#### LocalFile
```yaml
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
index 8d41ae848d..953da3027b 100644
---
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
@@ -49,6 +49,8 @@ public class HdfsConfiguration extends AbstractConfiguration {
private static final String HDFS_IMPL_KEY = "fs.hdfs.impl";
+ private static final String SEATUNNEL_HADOOP_PREFIX = "seatunnel.hadoop.";
+
@Override
public Configuration buildConfiguration(Map<String, String> config)
throws CheckpointStorageException {
@@ -69,7 +71,15 @@ public class HdfsConfiguration extends AbstractConfiguration
{
authenticateKerberos(kerberosPrincipal,
kerberosKeytabFilePath, hadoopConf);
}
}
- // todo support other hdfs optional config keys
+ // support other hdfs optional config keys
+ config.entrySet().stream()
+ .filter(entry ->
entry.getKey().startsWith(SEATUNNEL_HADOOP_PREFIX))
+ .forEach(
+ entry -> {
+ String key =
entry.getKey().replace(SEATUNNEL_HADOOP_PREFIX, "");
+ String value = entry.getValue();
+ hadoopConf.set(key, value);
+ });
return hadoopConf;
}
diff --git
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HDFSFileCheckpointTest.java
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HDFSFileCheckpointTest.java
new file mode 100644
index 0000000000..23a41a2782
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HDFSFileCheckpointTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs;
+
+import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Disabled(
+ "HDFS is not available in CI, if you want to run this test, please set
up your own HDFS environment")
+public class HDFSFileCheckpointTest extends AbstractFileCheckPointTest {
+
+ @BeforeAll
+ public static void setup() throws CheckpointStorageException {
+ Map<String, String> config = new HashMap<>();
+ config.put("storage.type", "hdfs");
+ config.put("fs.defaultFS", "hdfs://usdp-bing");
+ config.put("seatunnel.hadoop.dfs.nameservices", "usdp-bing");
+ config.put("seatunnel.hadoop.dfs.ha.namenodes.usdp-bing", "nn1,nn2");
+ config.put("seatunnel.hadoop.dfs.namenode.rpc-address.usdp-bing.nn1",
"usdp-bing-nn1:8020");
+ config.put("seatunnel.hadoop.dfs.namenode.rpc-address.usdp-bing.nn2",
"usdp-bing-nn2:8020");
+ config.put(
+
"seatunnel.hadoop.dfs.client.failover.proxy.provider.usdp-bing",
+
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
+ STORAGE = new HdfsStorage(config);
+ initStorageData();
+ }
+}