This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5e0e376a90 [Improve][Zeta] Support restore execute savemode (#9059)
5e0e376a90 is described below
commit 5e0e376a90c34e8f8523e627831bceb83db3aa66
Author: hailin0 <[email protected]>
AuthorDate: Tue Apr 8 13:32:34 2025 +0800
[Improve][Zeta] Support restore execute savemode (#9059)
---
.../seatunnel/api/sink/DefaultSaveModeHandler.java | 10 ++++
.../apache/seatunnel/api/sink/SaveModeHandler.java | 2 +
.../api/sink/DefaultSaveModeHandlerTest.java | 64 ++++++++++++++++++++++
.../e2e/sink/inmemory/InMemorySaveModeHandler.java | 3 +
.../core/parse/MultipleTableJobConfigParser.java | 21 +++++++
.../seatunnel/engine/server/master/JobMaster.java | 13 +++--
6 files changed, 109 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
index e34b8ba437..07200b29cb 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
@@ -117,6 +117,16 @@ public class DefaultSaveModeHandler implements
SaveModeHandler {
}
}
+ @Override
+ public void handleSchemaSaveModeWithRestore() {
+ if (SchemaSaveMode.ERROR_WHEN_SCHEMA_NOT_EXIST == schemaSaveMode) {
+ errorWhenSchemaNotExist();
+ } else if (SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST ==
schemaSaveMode
+ || SchemaSaveMode.RECREATE_SCHEMA == schemaSaveMode) {
+ createSchemaWhenNotExist();
+ }
+ }
+
protected void recreateSchema() {
if (tableExists()) {
dropTable();
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
index 3eddaf0514..5b67f934f0 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
@@ -28,6 +28,8 @@ public interface SaveModeHandler extends AutoCloseable {
void handleDataSaveMode();
+ void handleSchemaSaveModeWithRestore();
+
SchemaSaveMode getSchemaSaveMode();
DataSaveMode getDataSaveMode();
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandlerTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandlerTest.java
index 6f4785ce22..56b6b99155 100644
---
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandlerTest.java
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandlerTest.java
@@ -22,15 +22,24 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.InMemoryCatalog;
import org.apache.seatunnel.api.table.catalog.InMemoryCatalogFactory;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class DefaultSaveModeHandlerTest {
@@ -115,6 +124,61 @@ public class DefaultSaveModeHandlerTest {
"Should not truncate data for recreated table");
}
+ @Test
+ public void handlesErrorWhenSchemaNotExist() {
+ Catalog catalog = mock(Catalog.class);
+ CatalogTable catalogTable = createCatalogTable("notExistsTable");
+ when(catalog.tableExists(any(TablePath.class))).thenReturn(false);
+ DefaultSaveModeHandler handler =
+ new DefaultSaveModeHandler(
+ SchemaSaveMode.ERROR_WHEN_SCHEMA_NOT_EXIST,
+ DataSaveMode.APPEND_DATA,
+ catalog,
+ catalogTable,
+ null);
+
+ assertThrows(SeaTunnelRuntimeException.class,
handler::handleSchemaSaveModeWithRestore);
+ }
+
+ @Test
+ public void createsSchemaWhenNotExist() {
+ CatalogTable catalogTable = createCatalogTable("notExistsTable");
+
+ Catalog catalog = mock(Catalog.class);
+ when(catalog.tableExists(any(TablePath.class))).thenReturn(false);
+ DefaultSaveModeHandler handler =
+ new DefaultSaveModeHandler(
+ SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST,
+ DataSaveMode.APPEND_DATA,
+ catalog,
+ catalogTable,
+ null);
+
+ handler.handleSchemaSaveModeWithRestore();
+
+ verify(catalog, times(1))
+ .createTable(any(TablePath.class), any(CatalogTable.class),
eq(true));
+ }
+
+ @Test
+ public void recreatesSchemaWhenNotExist() {
+ CatalogTable catalogTable = createCatalogTable("notExistsTable");
+ Catalog catalog = mock(Catalog.class);
+ when(catalog.tableExists(any(TablePath.class))).thenReturn(false);
+ DefaultSaveModeHandler handler =
+ new DefaultSaveModeHandler(
+ SchemaSaveMode.RECREATE_SCHEMA,
+ DataSaveMode.APPEND_DATA,
+ catalog,
+ catalogTable,
+ null);
+
+ handler.handleSchemaSaveModeWithRestore();
+
+ verify(catalog, times(1))
+ .createTable(any(TablePath.class), any(CatalogTable.class),
eq(true));
+ }
+
private CatalogTable createCatalogTable(String tableName) {
return CatalogTableUtil.getCatalogTable("", "st", "public", tableName,
rowType);
}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
index 61084819a2..a0645672d9 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
@@ -55,6 +55,9 @@ public class InMemorySaveModeHandler implements
SaveModeHandler {
log.info("handle data savemode with table path: {}",
catalogTable.getTablePath());
}
+ @Override
+ public void handleSchemaSaveModeWithRestore() {}
+
@Override
public SchemaSaveMode getSchemaSaveMode() {
return SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST;
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index e3414b8dbd..d2b47f5009 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -690,6 +690,8 @@ public class MultipleTableJobConfigParser {
actionConfig);
if (!isStartWithSavePoint) {
handleSaveMode(sink);
+ } else {
+ handleSchemaSaveModeWithRestore(sink);
}
sinkAction.setParallelism(parallelism);
return sinkAction;
@@ -716,6 +718,25 @@ public class MultipleTableJobConfigParser {
}
}
+ public void handleSchemaSaveModeWithRestore(SeaTunnelSink<?, ?, ?, ?>
sink) {
+ if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
+ SupportSaveMode saveModeSink = (SupportSaveMode) sink;
+ if (envOptions
+ .get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION)
+ .equals(SaveModeExecuteLocation.CLIENT)) {
+ Optional<SaveModeHandler> saveModeHandler =
saveModeSink.getSaveModeHandler();
+ if (saveModeHandler.isPresent()) {
+ try (SaveModeHandler handler = saveModeHandler.get()) {
+ handler.open();
+ handler.handleSchemaSaveModeWithRestore();
+ } catch (Exception e) {
+ throw new
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
+ }
+ }
+ }
+ }
+ }
+
private List<URL> getSourcePluginJarPaths(Config sourceConfig) {
SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new
SeaTunnelSourcePluginDiscovery();
PluginIdentifier pluginIdentifier =
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 7881920318..e512a7fbc7 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -271,7 +271,8 @@ public class JobMaster {
logicalVertexIdClassLoaderMap.get(
sink.getId()));
JobMaster.handleSaveMode(
- ((SinkAction<?, ?, ?, ?>)
sink).getSink());
+ ((SinkAction<?, ?, ?, ?>)
sink).getSink(),
+ logicalDag.isStartWithSavePoint());
});
Thread.currentThread().setContextClassLoader(appClassLoader);
}
@@ -556,14 +557,18 @@ public class JobMaster {
}
}
- public static void handleSaveMode(SeaTunnelSink sink) {
+ public static void handleSaveMode(SeaTunnelSink sink, boolean
isStartWithSavePoint) {
if (sink instanceof SupportSaveMode) {
Optional<SaveModeHandler> saveModeHandler =
((SupportSaveMode) sink).getSaveModeHandler();
if (saveModeHandler.isPresent()) {
try (SaveModeHandler handler = saveModeHandler.get()) {
handler.open();
- new SaveModeExecuteWrapper(handler).execute();
+ if (!isStartWithSavePoint) {
+ new SaveModeExecuteWrapper(handler).execute();
+ } else {
+ handler.handleSchemaSaveModeWithRestore();
+ }
} catch (Exception e) {
throw new
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
}
@@ -571,7 +576,7 @@ public class JobMaster {
} else if (sink instanceof MultiTableSink) {
Map<TablePath, SeaTunnelSink> sinks = ((MultiTableSink)
sink).getSinks();
for (SeaTunnelSink seaTunnelSink : sinks.values()) {
- handleSaveMode(seaTunnelSink);
+ handleSaveMode(seaTunnelSink, isStartWithSavePoint);
}
}
}