This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 80cf91318d [Improve][Zeta] Move SaveMode behavior to master (#6843)
80cf91318d is described below
commit 80cf91318d6daebf4096a73c431cf270ef053785
Author: Jia Fan <[email protected]>
AuthorDate: Thu Jun 13 11:29:33 2024 +0800
[Improve][Zeta] Move SaveMode behavior to master (#6843)
---
docs/en/concept/JobEnvConfig.md | 6 +
.../apache/seatunnel/api/env/EnvCommonOptions.java | 7 ++
.../apache/seatunnel/api/env/EnvOptionRule.java | 1 +
.../api/sink/SaveModeExecuteLocation.java | 24 ++++
.../seatunnel/connectors/doris/sink/DorisSink.java | 6 +
.../connectors/seatunnel/jdbc/sink/JdbcSink.java | 5 +
.../seatunnel/jdbc/sink/JdbcSinkWriter.java | 5 +
.../e2e/sink/inmemory/InMemorySaveModeHandler.java | 137 +++++++++++++++++++++
.../seatunnel/e2e/sink/inmemory/InMemorySink.java | 15 ++-
.../e2e/sink/inmemory/InMemorySinkFactory.java | 2 +-
.../seatunnel/engine/e2e/JobClientJobProxyIT.java | 45 +++++++
.../engine/e2e/classloader/ClassLoaderITBase.java | 2 +-
.../savemode/fake_to_inmemory_savemode.conf | 57 +++++++++
.../savemode/fake_to_inmemory_savemode_client.conf | 58 +++++++++
.../engine/core/dag/logical/LogicalDag.java | 13 ++
.../core/dag/logical/LogicalDagGenerator.java | 13 +-
.../engine/core/job/AbstractJobEnvironment.java | 2 +-
.../engine/core/parse/JobConfigParser.java | 6 +-
.../core/parse/MultipleTableJobConfigParser.java | 32 ++---
.../seatunnel/engine/server/master/JobMaster.java | 62 +++++++++-
20 files changed, 469 insertions(+), 29 deletions(-)
diff --git a/docs/en/concept/JobEnvConfig.md b/docs/en/concept/JobEnvConfig.md
index 32bf089e92..66104f7fbc 100644
--- a/docs/en/concept/JobEnvConfig.md
+++ b/docs/en/concept/JobEnvConfig.md
@@ -33,6 +33,12 @@ This parameter configures the parallelism of source and sink.
Used to control the default retry times when a job fails. The default value is
3, and it only works in the Zeta engine.
+### savemode.execute.location
+
+This parameter is used to specify the location of the savemode when the job is
executed in the Zeta engine.
+The default value is `CLUSTER`, which means that the savemode is executed on
the cluster. If you want to execute the savemode on the client,
+you can set it to `CLIENT`. Please use `CLUSTER` mode as much as possible,
because when there are no problems with `CLUSTER` mode, we will remove `CLIENT`
mode.
+
### shade.identifier
Specify the method of encryption, if you didn't have the requirement for
encrypting or decrypting config files, this option can be ignored.
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index 75e58a5f5b..cabf0856dc 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.api.env;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.sink.SaveModeExecuteLocation;
import org.apache.seatunnel.common.constants.JobMode;
import java.util.Map;
@@ -76,6 +77,12 @@ public interface EnvCommonOptions {
.noDefaultValue()
.withDescription("The timeout (in milliseconds) for a
checkpoint.");
+ Option<SaveModeExecuteLocation> SAVEMODE_EXECUTE_LOCATION =
+ Options.key("savemode.execute.location")
+ .enumType(SaveModeExecuteLocation.class)
+ .defaultValue(SaveModeExecuteLocation.CLUSTER)
+ .withDescription("The location of save mode execute.");
+
Option<String> JARS =
Options.key("jars")
.stringType()
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
index d4caa710d8..262cfe065e 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
@@ -33,6 +33,7 @@ public class EnvOptionRule {
EnvCommonOptions.CHECKPOINT_TIMEOUT,
EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND,
EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND,
+ EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION,
EnvCommonOptions.CUSTOM_PARAMETERS)
.build();
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeExecuteLocation.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeExecuteLocation.java
new file mode 100644
index 0000000000..3184bce3fb
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeExecuteLocation.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.sink;
+
+public enum SaveModeExecuteLocation {
+ @Deprecated
+ CLIENT,
+ CLUSTER
+}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
index e0ef8e1893..e14b64d9a2 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
@@ -106,6 +106,12 @@ public class DorisSink
@Override
public Optional<SaveModeHandler> getSaveModeHandler() {
+ // Load the JDBC driver in to DriverManager
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
CatalogFactory catalogFactory =
discoverFactory(
Thread.currentThread().getContextClassLoader(),
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 384319d022..69b01f10d2 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -167,6 +167,11 @@ public class JdbcSink
@Override
public Optional<SaveModeHandler> getSaveModeHandler() {
+ try {
+
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
if (catalogTable != null) {
if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
return Optional.empty();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index 1cf08c4c43..d8c7b661c5 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -77,6 +77,11 @@ public class JdbcSinkWriter
public MultiTableResourceManager<ConnectionPoolManager>
initMultiTableResourceManager(
int tableSize, int queueSize) {
HikariDataSource ds = new HikariDataSource();
+ try {
+
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
ds.setIdleTimeout(30 * 1000);
ds.setMaximumPoolSize(queueSize);
ds.setJdbcUrl(jdbcSinkConfig.getJdbcConnectionConfig().getUrl());
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
new file mode 100644
index 0000000000..e2c28bd447
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.sink.inmemory;
+
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+
+@Slf4j
+public class InMemorySaveModeHandler implements SaveModeHandler {
+
+ private final CatalogTable catalogTable;
+
+ public InMemorySaveModeHandler(CatalogTable catalogTable) {
+ this.catalogTable = catalogTable;
+ }
+
+ @Override
+ public void handleSchemaSaveMode() {
+ log.info("handle schema savemode with table path: {}",
catalogTable.getTablePath());
+ }
+
+ @Override
+ public void handleDataSaveMode() {
+ log.info("handle data savemode with table path: {}",
catalogTable.getTablePath());
+ }
+
+ @Override
+ public SchemaSaveMode getSchemaSaveMode() {
+ return SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST;
+ }
+
+ @Override
+ public DataSaveMode getDataSaveMode() {
+ return DataSaveMode.APPEND_DATA;
+ }
+
+ @Override
+ public TablePath getHandleTablePath() {
+ return catalogTable.getTablePath();
+ }
+
+ @Override
+ public Catalog getHandleCatalog() {
+ return new Catalog() {
+ @Override
+ public void open() throws CatalogException {}
+
+ @Override
+ public void close() throws CatalogException {}
+
+ @Override
+ public String name() {
+ return "InMemoryCatalog";
+ }
+
+ @Override
+ public String getDefaultDatabase() throws CatalogException {
+ return null;
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws
CatalogException {
+ return false;
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ return null;
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws CatalogException, DatabaseNotExistException {
+ return null;
+ }
+
+ @Override
+ public boolean tableExists(TablePath tablePath) throws
CatalogException {
+ return false;
+ }
+
+ @Override
+ public CatalogTable getTable(TablePath tablePath)
+ throws CatalogException, TableNotExistException {
+ return null;
+ }
+
+ @Override
+ public void createTable(TablePath tablePath, CatalogTable table,
boolean ignoreIfExists)
+ throws TableAlreadyExistException,
DatabaseNotExistException,
+ CatalogException {}
+
+ @Override
+ public void dropTable(TablePath tablePath, boolean
ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {}
+
+ @Override
+ public void createDatabase(TablePath tablePath, boolean
ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {}
+
+ @Override
+ public void dropDatabase(TablePath tablePath, boolean
ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {}
+ };
+ }
+
+ @Override
+ public void close() throws Exception {}
+}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
index 3efe0ad00e..8f1eba9af4 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
@@ -20,10 +20,13 @@ package org.apache.seatunnel.e2e.sink.inmemory;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import java.io.IOException;
@@ -35,11 +38,14 @@ public class InMemorySink
InMemoryState,
InMemoryCommitInfo,
InMemoryAggregatedCommitInfo>,
- SupportMultiTableSink {
+ SupportMultiTableSink,
+ SupportSaveMode {
private ReadonlyConfig config;
+ private CatalogTable catalogTable;
- public InMemorySink(ReadonlyConfig config) {
+ public InMemorySink(CatalogTable catalogTable, ReadonlyConfig config) {
+ this.catalogTable = catalogTable;
this.config = config;
}
@@ -69,4 +75,9 @@ public class InMemorySink
public Optional<Serializer<InMemoryAggregatedCommitInfo>>
getAggregatedCommitInfoSerializer() {
return Optional.of(new DefaultSerializer<>());
}
+
+ @Override
+ public Optional<SaveModeHandler> getSaveModeHandler() {
+ return Optional.of(new InMemorySaveModeHandler(catalogTable));
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
index 16f1c0dc44..7b06ec99d9 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
@@ -56,6 +56,6 @@ public class InMemorySinkFactory
@Override
public TableSink<SeaTunnelRow, InMemoryState, InMemoryCommitInfo,
InMemoryAggregatedCommitInfo>
createSink(TableSinkFactoryContext context) {
- return () -> new InMemorySink(context.getOptions());
+ return () -> new InMemorySink(context.getCatalogTable(),
context.getOptions());
}
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
index bcbf40b9f7..3d871adb5a 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
@@ -70,6 +70,51 @@ public class JobClientJobProxyIT extends SeaTunnelContainer {
Assertions.assertNotEquals(0, execResult.getExitCode());
}
+ @Test
+ public void testSaveModeOnMasterOrClient() throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
+ executeJob(server, "/savemode/fake_to_inmemory_savemode.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ int serverLogLength = 0;
+ String serverLogs = server.getLogs();
+ Assertions.assertTrue(
+ serverLogs.contains(
+
"org.apache.seatunnel.e2e.sink.inmemory.InMemorySaveModeHandler - handle schema
savemode with table path: test.table1"));
+ Assertions.assertTrue(
+ serverLogs.contains(
+
"org.apache.seatunnel.e2e.sink.inmemory.InMemorySaveModeHandler - handle data
savemode with table path: test.table1"));
+ Assertions.assertTrue(
+ serverLogs.contains(
+
"org.apache.seatunnel.e2e.sink.inmemory.InMemorySaveModeHandler - handle schema
savemode with table path: test.table2"));
+ Assertions.assertTrue(
+ serverLogs.contains(
+
"org.apache.seatunnel.e2e.sink.inmemory.InMemorySaveModeHandler - handle data
savemode with table path: test.table2"));
+
+ // restore will not execute savemode
+ execResult = restoreJob(server,
"/savemode/fake_to_inmemory_savemode.conf", "1");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ // clear old logs
+ serverLogLength += serverLogs.length();
+ serverLogs = server.getLogs().substring(serverLogLength);
+ Assertions.assertFalse(serverLogs.contains("handle schema savemode
with table path"));
+ Assertions.assertFalse(serverLogs.contains("handle data savemode with
table path"));
+
+ // test savemode on client side
+ Container.ExecResult execResult2 =
+ executeJob(server,
"/savemode/fake_to_inmemory_savemode_client.conf");
+ Assertions.assertEquals(0, execResult2.getExitCode());
+ // clear old logs
+ serverLogLength += serverLogs.length();
+ serverLogs = server.getLogs().substring(serverLogLength);
+ Assertions.assertFalse(serverLogs.contains("handle schema savemode
with table path"));
+ Assertions.assertFalse(serverLogs.contains("handle data savemode with
table path"));
+
+ Assertions.assertTrue(
+ execResult2.getStdout().contains("handle schema savemode with
table path"));
+ Assertions.assertTrue(
+ execResult2.getStdout().contains("handle data savemode with
table path"));
+ }
+
@Test
public void testJobFailedWillThrowException() throws IOException,
InterruptedException {
Container.ExecResult execResult =
executeSeaTunnelJob("/batch_slot_not_enough.conf");
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java
index 60907b3c0e..cdeef180f6 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java
@@ -158,7 +158,7 @@ public abstract class ClassLoaderITBase extends
SeaTunnelContainer {
if (cacheMode()) {
Assertions.assertTrue(3 >= getClassLoaderCount());
} else {
- Assertions.assertTrue(2 + i >= getClassLoaderCount());
+ Assertions.assertTrue(3 + 2 * i >= getClassLoaderCount());
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/savemode/fake_to_inmemory_savemode.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/savemode/fake_to_inmemory_savemode.conf
new file mode 100644
index 0000000000..677589b0ac
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/savemode/fake_to_inmemory_savemode.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ row.num = 1
+ schema = {
+ table = "test.table1"
+ columns = [
+ {
+ name = id
+ type = bigint
+ }
+ ]
+ }
+ },
+ {
+ row.num = 1
+ schema = {
+ table = "test.table2"
+ columns = [
+ {
+ name = id
+ type = bigint
+ }
+ ]
+ }
+ }
+ ]
+ }
+}
+
+sink{
+ InMemory {
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/savemode/fake_to_inmemory_savemode_client.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/savemode/fake_to_inmemory_savemode_client.conf
new file mode 100644
index 0000000000..e7b15a25f2
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/savemode/fake_to_inmemory_savemode_client.conf
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ savemode.execute.location = client
+}
+
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ row.num = 1
+ schema = {
+ table = "test.table1"
+ columns = [
+ {
+ name = id
+ type = bigint
+ }
+ ]
+ }
+ },
+ {
+ row.num = 1
+ schema = {
+ table = "test.table2"
+ columns = [
+ {
+ name = id
+ type = bigint
+ }
+ ]
+ }
+ }
+ ]
+ }
+}
+
+sink{
+ InMemory {
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java
index 38fd74f3ee..8739a82d2a 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java
@@ -61,6 +61,7 @@ public class LogicalDag implements IdentifiedDataSerializable
{
private final Set<LogicalEdge> edges = new LinkedHashSet<>();
private final Map<Long, LogicalVertex> logicalVertexMap = new
LinkedHashMap<>();
private IdGenerator idGenerator;
+ private boolean isStartWithSavePoint = false;
public LogicalDag() {}
@@ -85,6 +86,14 @@ public class LogicalDag implements
IdentifiedDataSerializable {
return logicalVertexMap;
}
+ public boolean isStartWithSavePoint() {
+ return isStartWithSavePoint;
+ }
+
+ public void setStartWithSavePoint(boolean startWithSavePoint) {
+ isStartWithSavePoint = startWithSavePoint;
+ }
+
@NonNull public JsonObject getLogicalDagAsJson() {
JsonObject logicalDag = new JsonObject();
JsonArray vertices = new JsonArray();
@@ -143,6 +152,8 @@ public class LogicalDag implements
IdentifiedDataSerializable {
out.writeObject(jobConfig);
out.writeObject(idGenerator);
+
+ out.writeBoolean(isStartWithSavePoint);
}
@Override
@@ -165,6 +176,8 @@ public class LogicalDag implements
IdentifiedDataSerializable {
jobConfig = in.readObject();
idGenerator = in.readObject();
+
+ isStartWithSavePoint = in.readBoolean();
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
index f847c2127b..6a56240b99 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
@@ -38,6 +38,7 @@ public class LogicalDagGenerator {
private List<Action> actions;
private JobConfig jobConfig;
private IdGenerator idGenerator;
+ private boolean isStartWithSavePoint;
private final Map<Long, LogicalVertex> logicalVertexMap = new
LinkedHashMap<>();
@@ -51,10 +52,19 @@ public class LogicalDagGenerator {
@NonNull List<Action> actions,
@NonNull JobConfig jobConfig,
@NonNull IdGenerator idGenerator) {
+ this(actions, jobConfig, idGenerator, false);
+ }
+
+ public LogicalDagGenerator(
+ @NonNull List<Action> actions,
+ @NonNull JobConfig jobConfig,
+ @NonNull IdGenerator idGenerator,
+ boolean isStartWithSavePoint) {
this.actions = actions;
this.jobConfig = jobConfig;
this.idGenerator = idGenerator;
- if (actions.size() <= 0) {
+ this.isStartWithSavePoint = isStartWithSavePoint;
+ if (actions.isEmpty()) {
throw new IllegalStateException("No actions define in the job.
Cannot execute.");
}
}
@@ -65,6 +75,7 @@ public class LogicalDagGenerator {
LogicalDag logicalDag = new LogicalDag(jobConfig, idGenerator);
logicalDag.getEdges().addAll(logicalEdges);
logicalDag.getLogicalVertexMap().putAll(logicalVertexMap);
+ logicalDag.setStartWithSavePoint(isStartWithSavePoint);
return logicalDag;
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
index 9f54db03d5..49c9b9275d 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
@@ -146,7 +146,7 @@ public abstract class AbstractJobEnvironment {
protected abstract MultipleTableJobConfigParser getJobConfigParser();
protected LogicalDagGenerator getLogicalDagGenerator() {
- return new LogicalDagGenerator(actions, jobConfig, idGenerator);
+ return new LogicalDagGenerator(actions, jobConfig, idGenerator,
isStartWithSavePoint);
}
protected abstract LogicalDag getLogicalDag();
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index 981b85049a..2ec19cabc9 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -53,21 +53,23 @@ import java.util.Set;
import java.util.stream.Collectors;
import static
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.checkProducedTypeEquals;
-import static
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode;
@Data
public class JobConfigParser {
private static final ILogger LOGGER =
Logger.getLogger(JobConfigParser.class);
private IdGenerator idGenerator;
private boolean isStartWithSavePoint;
+ private MultipleTableJobConfigParser multipleTableJobConfigParser;
private List<URL> commonPluginJars;
public JobConfigParser(
@NonNull IdGenerator idGenerator,
@NonNull List<URL> commonPluginJars,
+ MultipleTableJobConfigParser multipleTableJobConfigParser,
boolean isStartWithSavePoint) {
this.idGenerator = idGenerator;
this.commonPluginJars = commonPluginJars;
+ this.multipleTableJobConfigParser = multipleTableJobConfigParser;
this.isStartWithSavePoint = isStartWithSavePoint;
}
@@ -166,7 +168,7 @@ public class JobConfigParser {
sink.setJobContext(jobConfig.getJobContext());
sink.setTypeInfo(rowType);
if (!isStartWithSavePoint) {
- handleSaveMode(sink);
+ multipleTableJobConfigParser.handleSaveMode(sink);
}
final String actionName =
createSinkActionName(configIndex,
tuple.getLeft().getPluginName());
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 6e5fa3decd..c1ff66c0d3 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.env.EnvCommonOptions;
+import org.apache.seatunnel.api.sink.SaveModeExecuteLocation;
import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -146,7 +147,7 @@ public class MultipleTableJobConfigParser {
this.seaTunnelJobConfig =
ConfigBuilder.of(Paths.get(jobDefineFilePath), variables);
this.envOptions =
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
this.fallbackParser =
- new JobConfigParser(idGenerator, commonPluginJars,
isStartWithSavePoint);
+ new JobConfigParser(idGenerator, commonPluginJars, this,
isStartWithSavePoint);
}
public MultipleTableJobConfigParser(
@@ -162,7 +163,7 @@ public class MultipleTableJobConfigParser {
this.seaTunnelJobConfig = seaTunnelJobConfig;
this.envOptions =
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
this.fallbackParser =
- new JobConfigParser(idGenerator, commonPluginJars,
isStartWithSavePoint);
+ new JobConfigParser(idGenerator, commonPluginJars, this,
isStartWithSavePoint);
}
public ImmutablePair<List<Action>, Set<URL>> parse(ClassLoaderService
classLoaderService) {
@@ -268,13 +269,6 @@ public class MultipleTableJobConfigParser {
});
}
- void addCommonPluginJarsToAction(Action action) {
- action.getJarUrls().addAll(commonPluginJars);
- if (!action.getUpstream().isEmpty()) {
- action.getUpstream().forEach(this::addCommonPluginJarsToAction);
- }
- }
-
private void fillJobConfig() {
jobConfig.getJobContext().setJobMode(envOptions.get(EnvCommonOptions.JOB_MODE));
if (StringUtils.isEmpty(jobConfig.getName())
@@ -660,15 +654,21 @@ public class MultipleTableJobConfigParser {
return sinkAction;
}
- public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
+ public void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
SupportSaveMode saveModeSink = (SupportSaveMode) sink;
- Optional<SaveModeHandler> saveModeHandler =
saveModeSink.getSaveModeHandler();
- if (saveModeHandler.isPresent()) {
- try (SaveModeHandler handler = saveModeHandler.get()) {
- new SaveModeExecuteWrapper(handler).execute();
- } catch (Exception e) {
- throw new
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
+ if (envOptions
+ .get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION)
+ .equals(SaveModeExecuteLocation.CLIENT)) {
+ log.warn(
+ "SaveMode execute location on CLIENT is deprecated,
please use SERVER instead.");
+ Optional<SaveModeHandler> saveModeHandler =
saveModeSink.getSaveModeHandler();
+ if (saveModeHandler.isPresent()) {
+ try (SaveModeHandler handler = saveModeHandler.get()) {
+ new SaveModeExecuteWrapper(handler).execute();
+ } catch (Exception e) {
+ throw new
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
+ }
}
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 2e9e168af2..8f54402d80 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -19,8 +19,16 @@ package org.apache.seatunnel.engine.server.master;
import org.apache.seatunnel.api.common.metrics.JobMetrics;
import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.env.EnvCommonOptions;
+import org.apache.seatunnel.api.sink.SaveModeExecuteLocation;
+import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
@@ -32,7 +40,9 @@ import
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
@@ -80,11 +90,13 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
+import static
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
import static org.apache.seatunnel.common.constants.JobMode.BATCH;
public class JobMaster {
@@ -210,11 +222,22 @@ public class JobMaster {
nodeEngine.getSerializationService(),
classLoader,
jobImmutableInformation.getLogicalDag());
- seaTunnelServer
- .getClassLoaderService()
- .releaseClassLoader(
- jobImmutableInformation.getJobId(),
- jobImmutableInformation.getPluginJarsUrls());
+ if (!restart
+ && !logicalDag.isStartWithSavePoint()
+ &&
ReadonlyConfig.fromMap(logicalDag.getJobConfig().getEnvOptions())
+ .get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION)
+ .equals(SaveModeExecuteLocation.CLUSTER)) {
+ try {
+ Thread.currentThread().setContextClassLoader(classLoader);
+ logicalDag.getLogicalVertexMap().values().stream()
+ .map(LogicalVertex::getAction)
+ .filter(action -> action instanceof SinkAction)
+ .map(sink -> ((SinkAction<?, ?, ?, ?>) sink).getSink())
+ .forEach(JobMaster::handleSaveMode);
+ } finally {
+ Thread.currentThread().setContextClassLoader(appClassLoader);
+ }
+ }
final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple =
PlanUtils.fromLogicalDAG(
@@ -228,6 +251,11 @@ public class JobMaster {
runningJobStateTimestampsIMap,
engineConfig.getQueueType(),
engineConfig);
+ seaTunnelServer
+ .getClassLoaderService()
+ .releaseClassLoader(
+ jobImmutableInformation.getJobId(),
+ jobImmutableInformation.getPluginJarsUrls());
// revert to app class loader, it may be changed by
PlanUtils.fromLogicalDAG
Thread.currentThread().setContextClassLoader(appClassLoader);
this.physicalPlan = planTuple.f0();
@@ -333,6 +361,30 @@ public class JobMaster {
}
}
+ public static void handleSaveMode(SeaTunnelSink sink) {
+ if (sink instanceof SupportSaveMode) {
+ Optional<SaveModeHandler> saveModeHandler =
+ ((SupportSaveMode) sink).getSaveModeHandler();
+ if (saveModeHandler.isPresent()) {
+ try (SaveModeHandler handler = saveModeHandler.get()) {
+ new SaveModeExecuteWrapper(handler).execute();
+ } catch (Exception e) {
+ throw new
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
+ }
+ }
+ } else if (sink.getClass()
+ .getName()
+ .equals(
+
"org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSink"))
{
+ // TODO we should not use class name to judge the sink type
+ Map<String, SeaTunnelSink> sinks =
+ (Map<String, SeaTunnelSink>)
ReflectionUtils.getField(sink, "sinks").get();
+ for (SeaTunnelSink seaTunnelSink : sinks.values()) {
+ handleSaveMode(seaTunnelSink);
+ }
+ }
+ }
+
public void handleCheckpointError(long pipelineId, boolean neverRestore) {
if (neverRestore) {
this.neverNeedRestore();