This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f1880394ab [Feature][Zeta] Checkpoint support hdfs ha mode (#4942)
f1880394ab is described below

commit f1880394abb6015b85bc74692051b3ed13eb8f79
Author: Koyfin <[email protected]>
AuthorDate: Mon Aug 14 11:11:19 2023 +0800

    [Feature][Zeta] Checkpoint support hdfs ha mode (#4942)
---
 docs/en/seatunnel-engine/checkpoint-storage.md     | 22 ++++++++++
 .../storage/hdfs/common/HdfsConfiguration.java     | 12 +++++-
 .../storage/hdfs/HDFSFileCheckpointTest.java       | 50 ++++++++++++++++++++++
 3 files changed, 83 insertions(+), 1 deletion(-)

diff --git a/docs/en/seatunnel-engine/checkpoint-storage.md 
b/docs/en/seatunnel-engine/checkpoint-storage.md
index afe1fa6bc1..f2a6487f28 100644
--- a/docs/en/seatunnel-engine/checkpoint-storage.md
+++ b/docs/en/seatunnel-engine/checkpoint-storage.md
@@ -146,6 +146,28 @@ seatunnel:
           kerberosKeytab: your-kerberos-keytab  
 ```
 
+if HDFS is in HA mode , you can config like this:
+
+```yaml
+seatunnel:
+  engine:
+    checkpoint:
+      storage:
+        type: hdfs
+        max-retained: 3
+        plugin-config:
+          storage.type: hdfs
+          fs.defaultFS: hdfs://usdp-bing
+          seatunnel.hadoop.dfs.nameservices: usdp-bing
+          seatunnel.hadoop.dfs.ha.namenodes.usdp-bing: nn1,nn2
+          seatunnel.hadoop.dfs.namenode.rpc-address.usdp-bing.nn1: 
usdp-bing-nn1:8020
+          seatunnel.hadoop.dfs.namenode.rpc-address.usdp-bing.nn2: 
usdp-bing-nn2:8020
+          seatunnel.hadoop.dfs.client.failover.proxy.provider.usdp-bing: 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+
+```
+
+if HDFS has  some other configs in `hdfs-site.xml` or `core-site.xml` , just 
set HDFS config by using  `seatunnel.hadoop.`  prefix.
+
 #### LocalFile
 
 ```yaml
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
index 8d41ae848d..953da3027b 100644
--- 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
+++ 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
@@ -49,6 +49,8 @@ public class HdfsConfiguration extends AbstractConfiguration {
 
     private static final String HDFS_IMPL_KEY = "fs.hdfs.impl";
 
+    private static final String SEATUNNEL_HADOOP_PREFIX = "seatunnel.hadoop.";
+
     @Override
     public Configuration buildConfiguration(Map<String, String> config)
             throws CheckpointStorageException {
@@ -69,7 +71,15 @@ public class HdfsConfiguration extends AbstractConfiguration 
{
                 authenticateKerberos(kerberosPrincipal, 
kerberosKeytabFilePath, hadoopConf);
             }
         }
-        // todo support other hdfs optional config keys
+        //  support other hdfs optional config keys
+        config.entrySet().stream()
+                .filter(entry -> 
entry.getKey().startsWith(SEATUNNEL_HADOOP_PREFIX))
+                .forEach(
+                        entry -> {
+                            String key = 
entry.getKey().replace(SEATUNNEL_HADOOP_PREFIX, "");
+                            String value = entry.getValue();
+                            hadoopConf.set(key, value);
+                        });
         return hadoopConf;
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HDFSFileCheckpointTest.java
 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HDFSFileCheckpointTest.java
new file mode 100644
index 0000000000..23a41a2782
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HDFSFileCheckpointTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs;
+
+import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Disabled(
+        "HDFS is not available in CI, if you want to run this test, please set 
up your own HDFS environment")
+public class HDFSFileCheckpointTest extends AbstractFileCheckPointTest {
+
+    @BeforeAll
+    public static void setup() throws CheckpointStorageException {
+        Map<String, String> config = new HashMap<>();
+        config.put("storage.type", "hdfs");
+        config.put("fs.defaultFS", "hdfs://usdp-bing");
+        config.put("seatunnel.hadoop.dfs.nameservices", "usdp-bing");
+        config.put("seatunnel.hadoop.dfs.ha.namenodes.usdp-bing", "nn1,nn2");
+        config.put("seatunnel.hadoop.dfs.namenode.rpc-address.usdp-bing.nn1", 
"usdp-bing-nn1:8020");
+        config.put("seatunnel.hadoop.dfs.namenode.rpc-address.usdp-bing.nn2", 
"usdp-bing-nn2:8020");
+        config.put(
+                
"seatunnel.hadoop.dfs.client.failover.proxy.provider.usdp-bing",
+                
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
+        STORAGE = new HdfsStorage(config);
+        initStorageData();
+    }
+}

Reply via email to