This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c49a62506 [Engine][Checkpoint]Allow users to choose their own storage 
plugins (#3683)
c49a62506 is described below

commit c49a6250613c3f8da888c1c6a4bf3142af8a73b6
Author: Kirs <[email protected]>
AuthorDate: Thu Dec 8 22:44:04 2022 +0800

    [Engine][Checkpoint]Allow users to choose their own storage plugins (#3683)
---
 .../config/YamlSeaTunnelDomConfigProcessor.java      | 20 ++++++++++++++++++++
 .../config/server/CheckpointStorageConfig.java       |  8 ++++++++
 .../common/config/server/ServerConfigOptions.java    |  5 +++++
 .../common/config/YamlSeaTunnelConfigParserTest.java | 19 ++++++++++---------
 .../src/test/resources/seatunnel.yaml                |  7 ++++++-
 seatunnel-engine/seatunnel-engine-server/pom.xml     |  5 +++++
 .../engine/server/checkpoint/CheckpointManager.java  |  3 +--
 .../seatunnel/engine/server/master/JobMaster.java    |  1 +
 8 files changed, 56 insertions(+), 12 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index 5cc22bf5f..22e137b09 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -33,6 +33,9 @@ import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import org.w3c.dom.Node;
 
+import java.util.HashMap;
+import java.util.Map;
+
 public class YamlSeaTunnelDomConfigProcessor extends 
AbstractDomConfigProcessor {
     private static final ILogger LOGGER = 
Logger.getLogger(YamlSeaTunnelDomConfigProcessor.class);
 
@@ -141,6 +144,9 @@ public class YamlSeaTunnelDomConfigProcessor extends 
AbstractDomConfigProcessor
             } else if 
(ServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED.key().equals(name)) {
                 
checkpointStorageConfig.setMaxRetainedCheckpoints(getIntegerValue(ServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED.key(),
                     getTextContent(node)));
+            } else if 
(ServerConfigOptions.CHECKPOINT_STORAGE_PLUGIN_CONFIG.key().equals(name)) {
+                Map<String, String> pluginConfig = 
parseCheckpointPluginConfig(node);
+                checkpointStorageConfig.setStoragePluginConfig(pluginConfig);
             } else {
                 LOGGER.warning("Unrecognized element: " + name);
             }
@@ -148,4 +154,18 @@ public class YamlSeaTunnelDomConfigProcessor extends 
AbstractDomConfigProcessor
         return checkpointStorageConfig;
     }
 
+    /**
+     * Parse checkpoint plugin config.
+     * @param checkpointPluginConfigNode checkpoint plugin config node
+     * @return checkpoint plugin config
+     */
+    private Map<String, String> parseCheckpointPluginConfig(Node 
checkpointPluginConfigNode) {
+        Map<String, String> checkpointPluginConfig = new HashMap<>();
+        for (Node node : childElements(checkpointPluginConfigNode)) {
+            String name = cleanNodeName(node);
+            checkpointPluginConfig.put(name, getTextContent(node));
+        }
+        return checkpointPluginConfig;
+    }
+
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java
index 18f915c88..b46ea1787 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java
@@ -19,10 +19,18 @@ package org.apache.seatunnel.engine.common.config.server;
 
 import lombok.Data;
 
+import java.util.HashMap;
+import java.util.Map;
+
 @Data
 public class CheckpointStorageConfig {
 
     private String storage = 
ServerConfigOptions.CHECKPOINT_STORAGE_TYPE.defaultValue();
 
     private int maxRetainedCheckpoints = 
ServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED.defaultValue();
+
+    /**
+     * Storage plugin instance configuration
+     */
+    private Map<String, String> storagePluginConfig = new HashMap<>();
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index 22be81e01..5dc5e0080 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.configuration.Options;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 
+import java.util.Map;
+
 @SuppressWarnings("checkstyle:MagicNumber")
 public class ServerConfigOptions {
 
@@ -53,4 +55,7 @@ public class ServerConfigOptions {
 
     public static final Option<CheckpointConfig> CHECKPOINT = 
Options.key("checkpoint").type(new TypeReference<CheckpointConfig>() {
     }).defaultValue(new CheckpointConfig()).withDescription("The checkpoint 
configuration.");
+
+    public static final Option<Map<String, String>> 
CHECKPOINT_STORAGE_PLUGIN_CONFIG = Options.key("plugin-config").type(new 
TypeReference<Map<String, String>>() {
+    }).noDefaultValue().withDescription("The checkpoint storage instance 
configuration.");
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
 
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
index f4d507e85..bf17925d9 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
@@ -34,25 +34,26 @@ public class YamlSeaTunnelConfigParserTest {
         }
         Assertions.assertNotNull(config);
 
-        Assertions.assertEquals(config.getEngineConfig().getBackupCount(), 1);
+        Assertions.assertEquals(1, config.getEngineConfig().getBackupCount());
 
-        
Assertions.assertEquals(config.getEngineConfig().getPrintExecutionInfoInterval(),
 2);
+        Assertions.assertEquals(2, 
config.getEngineConfig().getPrintExecutionInfoInterval());
 
         
Assertions.assertFalse(config.getEngineConfig().getSlotServiceConfig().isDynamicSlot());
 
-        
Assertions.assertEquals(config.getEngineConfig().getSlotServiceConfig().getSlotNum(),
 5);
+        Assertions.assertEquals(5, 
config.getEngineConfig().getSlotServiceConfig().getSlotNum());
 
-        
Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getCheckpointInterval(),
 6000);
+        Assertions.assertEquals(6000, 
config.getEngineConfig().getCheckpointConfig().getCheckpointInterval());
 
-        
Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getCheckpointTimeout(),
 7000);
+        Assertions.assertEquals(7000, 
config.getEngineConfig().getCheckpointConfig().getCheckpointTimeout());
 
-        
Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getMaxConcurrentCheckpoints(),
 5);
+        Assertions.assertEquals(5, 
config.getEngineConfig().getCheckpointConfig().getMaxConcurrentCheckpoints());
 
-        
Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getTolerableFailureCheckpoints(),
 2);
+        Assertions.assertEquals(2, 
config.getEngineConfig().getCheckpointConfig().getTolerableFailureCheckpoints());
 
-        
Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getStorage().getStorage(),
 "test");
+        Assertions.assertEquals("hdfs", 
config.getEngineConfig().getCheckpointConfig().getStorage().getStorage());
 
-        
Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getStorage().getMaxRetainedCheckpoints(),
 3);
+        Assertions.assertEquals(3, 
config.getEngineConfig().getCheckpointConfig().getStorage().getMaxRetainedCheckpoints());
+        Assertions.assertEquals("secret-key", 
config.getEngineConfig().getCheckpointConfig().getStorage().getStoragePluginConfig().get("s3.secret-key"));
 
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml 
b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
index 4a2e85121..8d554763d 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
+++ b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
@@ -28,5 +28,10 @@ seatunnel:
             max-concurrent: 5
             tolerable-failure: 2
             storage:
-                type: test
+                type: hdfs
                 max-retained: 3
+                plugin-config:
+                    storage-type: s3
+                    s3.access-key: access-key
+                    s3.secret-key: secret-key
+                    
diff --git a/seatunnel-engine/seatunnel-engine-server/pom.xml 
b/seatunnel-engine/seatunnel-engine-server/pom.xml
index da0abd9f9..e83504353 100644
--- a/seatunnel-engine/seatunnel-engine-server/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-server/pom.xml
@@ -44,6 +44,11 @@
             <artifactId>checkpoint-storage-local-file</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>checkpoint-storage-hdfs</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>com.hazelcast</groupId>
             <artifactId>hazelcast</artifactId>
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 688fe0a42..704bbd740 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -49,7 +49,6 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -87,7 +86,7 @@ public class CheckpointManager {
         this.checkpointStorage =
             
FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), 
CheckpointStorageFactory.class,
                     checkpointConfig.getStorage().getStorage())
-                .create(new ConcurrentHashMap<>());
+                
.create(checkpointConfig.getStorage().getStoragePluginConfig());
         IMap<Integer, Long> checkpointIdMap =
             
nodeEngine.getHazelcastInstance().getMap(String.format("checkpoint-id-%d", 
jobId));
         this.coordinatorMap = checkpointPlanMap.values().parallelStream()
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index cc559d11a..b0b91d1bc 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -197,6 +197,7 @@ public class JobMaster extends Thread {
         CheckpointStorageConfig storageConfig = new CheckpointStorageConfig();
         
storageConfig.setMaxRetainedCheckpoints(engine.getStorage().getMaxRetainedCheckpoints());
         storageConfig.setStorage(engine.getStorage().getStorage());
+        
storageConfig.setStoragePluginConfig(engine.getStorage().getStoragePluginConfig());
         checkpointConfig.setStorage(storageConfig);
         return checkpointConfig;
     }

Reply via email to