This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c49a62506 [Engine][Checkpoint]Allow users to choose their own storage
plugins (#3683)
c49a62506 is described below
commit c49a6250613c3f8da888c1c6a4bf3142af8a73b6
Author: Kirs <[email protected]>
AuthorDate: Thu Dec 8 22:44:04 2022 +0800
[Engine][Checkpoint]Allow users to choose their own storage plugins (#3683)
---
.../config/YamlSeaTunnelDomConfigProcessor.java | 20 ++++++++++++++++++++
.../config/server/CheckpointStorageConfig.java | 8 ++++++++
.../common/config/server/ServerConfigOptions.java | 5 +++++
.../common/config/YamlSeaTunnelConfigParserTest.java | 19 ++++++++++---------
.../src/test/resources/seatunnel.yaml | 7 ++++++-
seatunnel-engine/seatunnel-engine-server/pom.xml | 5 +++++
.../engine/server/checkpoint/CheckpointManager.java | 3 +--
.../seatunnel/engine/server/master/JobMaster.java | 1 +
8 files changed, 56 insertions(+), 12 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index 5cc22bf5f..22e137b09 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -33,6 +33,9 @@ import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import org.w3c.dom.Node;
+import java.util.HashMap;
+import java.util.Map;
+
public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor {
private static final ILogger LOGGER =
Logger.getLogger(YamlSeaTunnelDomConfigProcessor.class);
@@ -141,6 +144,9 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
} else if
(ServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED.key().equals(name)) {
checkpointStorageConfig.setMaxRetainedCheckpoints(getIntegerValue(ServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED.key(),
getTextContent(node)));
+ } else if
(ServerConfigOptions.CHECKPOINT_STORAGE_PLUGIN_CONFIG.key().equals(name)) {
+ Map<String, String> pluginConfig =
parseCheckpointPluginConfig(node);
+ checkpointStorageConfig.setStoragePluginConfig(pluginConfig);
} else {
LOGGER.warning("Unrecognized element: " + name);
}
@@ -148,4 +154,18 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
return checkpointStorageConfig;
}
+ /**
+ * Parse checkpoint plugin config.
+ * @param checkpointPluginConfigNode checkpoint plugin config node
+ * @return checkpoint plugin config
+ */
+ private Map<String, String> parseCheckpointPluginConfig(Node
checkpointPluginConfigNode) {
+ Map<String, String> checkpointPluginConfig = new HashMap<>();
+ for (Node node : childElements(checkpointPluginConfigNode)) {
+ String name = cleanNodeName(node);
+ checkpointPluginConfig.put(name, getTextContent(node));
+ }
+ return checkpointPluginConfig;
+ }
+
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java
index 18f915c88..b46ea1787 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java
@@ -19,10 +19,18 @@ package org.apache.seatunnel.engine.common.config.server;
import lombok.Data;
+import java.util.HashMap;
+import java.util.Map;
+
@Data
public class CheckpointStorageConfig {
private String storage =
ServerConfigOptions.CHECKPOINT_STORAGE_TYPE.defaultValue();
private int maxRetainedCheckpoints =
ServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED.defaultValue();
+
+ /**
+ * Storage plugin instance configuration
+ */
+ private Map<String, String> storagePluginConfig = new HashMap<>();
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index 22be81e01..5dc5e0080 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.configuration.Options;
import com.fasterxml.jackson.core.type.TypeReference;
+import java.util.Map;
+
@SuppressWarnings("checkstyle:MagicNumber")
public class ServerConfigOptions {
@@ -53,4 +55,7 @@ public class ServerConfigOptions {
public static final Option<CheckpointConfig> CHECKPOINT =
Options.key("checkpoint").type(new TypeReference<CheckpointConfig>() {
}).defaultValue(new CheckpointConfig()).withDescription("The checkpoint
configuration.");
+
+ public static final Option<Map<String, String>>
CHECKPOINT_STORAGE_PLUGIN_CONFIG = Options.key("plugin-config").type(new
TypeReference<Map<String, String>>() {
+ }).noDefaultValue().withDescription("The checkpoint storage instance
configuration.");
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
index f4d507e85..bf17925d9 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
@@ -34,25 +34,26 @@ public class YamlSeaTunnelConfigParserTest {
}
Assertions.assertNotNull(config);
- Assertions.assertEquals(config.getEngineConfig().getBackupCount(), 1);
+ Assertions.assertEquals(1, config.getEngineConfig().getBackupCount());
-
Assertions.assertEquals(config.getEngineConfig().getPrintExecutionInfoInterval(),
2);
+ Assertions.assertEquals(2,
config.getEngineConfig().getPrintExecutionInfoInterval());
Assertions.assertFalse(config.getEngineConfig().getSlotServiceConfig().isDynamicSlot());
-
Assertions.assertEquals(config.getEngineConfig().getSlotServiceConfig().getSlotNum(),
5);
+ Assertions.assertEquals(5,
config.getEngineConfig().getSlotServiceConfig().getSlotNum());
-
Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getCheckpointInterval(),
6000);
+ Assertions.assertEquals(6000,
config.getEngineConfig().getCheckpointConfig().getCheckpointInterval());
-
Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getCheckpointTimeout(),
7000);
+ Assertions.assertEquals(7000,
config.getEngineConfig().getCheckpointConfig().getCheckpointTimeout());
-
Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getMaxConcurrentCheckpoints(),
5);
+ Assertions.assertEquals(5,
config.getEngineConfig().getCheckpointConfig().getMaxConcurrentCheckpoints());
-
Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getTolerableFailureCheckpoints(),
2);
+ Assertions.assertEquals(2,
config.getEngineConfig().getCheckpointConfig().getTolerableFailureCheckpoints());
-
Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getStorage().getStorage(),
"test");
+ Assertions.assertEquals("hdfs",
config.getEngineConfig().getCheckpointConfig().getStorage().getStorage());
-
Assertions.assertEquals(config.getEngineConfig().getCheckpointConfig().getStorage().getMaxRetainedCheckpoints(),
3);
+ Assertions.assertEquals(3,
config.getEngineConfig().getCheckpointConfig().getStorage().getMaxRetainedCheckpoints());
+ Assertions.assertEquals("secret-key",
config.getEngineConfig().getCheckpointConfig().getStorage().getStoragePluginConfig().get("s3.secret-key"));
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
index 4a2e85121..8d554763d 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
+++ b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
@@ -28,5 +28,10 @@ seatunnel:
max-concurrent: 5
tolerable-failure: 2
storage:
- type: test
+ type: hdfs
max-retained: 3
+ plugin-config:
+ storage-type: s3
+ s3.access-key: access-key
+ s3.secret-key: secret-key
+
diff --git a/seatunnel-engine/seatunnel-engine-server/pom.xml
b/seatunnel-engine/seatunnel-engine-server/pom.xml
index da0abd9f9..e83504353 100644
--- a/seatunnel-engine/seatunnel-engine-server/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-server/pom.xml
@@ -44,6 +44,11 @@
<artifactId>checkpoint-storage-local-file</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>checkpoint-storage-hdfs</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 688fe0a42..704bbd740 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -49,7 +49,6 @@ import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -87,7 +86,7 @@ public class CheckpointManager {
this.checkpointStorage =
FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(),
CheckpointStorageFactory.class,
checkpointConfig.getStorage().getStorage())
- .create(new ConcurrentHashMap<>());
+
.create(checkpointConfig.getStorage().getStoragePluginConfig());
IMap<Integer, Long> checkpointIdMap =
nodeEngine.getHazelcastInstance().getMap(String.format("checkpoint-id-%d",
jobId));
this.coordinatorMap = checkpointPlanMap.values().parallelStream()
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index cc559d11a..b0b91d1bc 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -197,6 +197,7 @@ public class JobMaster extends Thread {
CheckpointStorageConfig storageConfig = new CheckpointStorageConfig();
storageConfig.setMaxRetainedCheckpoints(engine.getStorage().getMaxRetainedCheckpoints());
storageConfig.setStorage(engine.getStorage().getStorage());
+
storageConfig.setStoragePluginConfig(engine.getStorage().getStoragePluginConfig());
checkpointConfig.setStorage(storageConfig);
return checkpointConfig;
}