This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8c2c5c79a1 [Improve][API] Move catalog open to SaveModeHandler (#7439)
8c2c5c79a1 is described below

commit 8c2c5c79a18cb75a032117d9e28b3d794531de6c
Author: Jia Fan <[email protected]>
AuthorDate: Thu Aug 22 20:13:08 2024 +0800

    [Improve][API] Move catalog open to SaveModeHandler (#7439)
---
 .../java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java   | 5 +++++
 .../src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java | 2 ++
 .../java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java   | 1 -
 .../connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java   | 1 -
 .../seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java     | 1 -
 .../apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java    | 1 -
 .../seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java       | 1 -
 .../seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java       | 1 -
 .../seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java | 1 -
 .../seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java | 1 +
 .../seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java | 1 +
 .../seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java | 1 +
 .../seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java | 1 +
 .../org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java     | 5 ++++-
 .../apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java  | 3 +++
 .../seatunnel/engine/core/parse/MultipleTableJobConfigParser.java    | 1 +
 .../java/org/apache/seatunnel/engine/server/master/JobMaster.java    | 1 +
 17 files changed, 20 insertions(+), 8 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
index 051068dba0..fc07b99000 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
@@ -59,6 +59,11 @@ public class DefaultSaveModeHandler implements 
SaveModeHandler {
                 customSql);
     }
 
+    @Override
+    public void open() {
+        catalog.open();
+    }
+
     @Override
     public void handleSchemaSaveMode() {
         switch (schemaSaveMode) {
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
index e75c2215dd..3eddaf0514 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 
 public interface SaveModeHandler extends AutoCloseable {
 
+    void open();
+
     void handleSchemaSaveMode();
 
     void handleDataSaveMode();
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
index c449dda027..c746fea00c 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
@@ -125,7 +125,6 @@ public class DorisSink
         }
 
         Catalog catalog = 
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), config);
-        catalog.open();
         return Optional.of(
                 new DefaultSaveModeHandler(
                         config.get(DorisOptions.SCHEMA_SAVE_MODE),
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
index dee47bfd73..fed65733e0 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -91,7 +91,6 @@ public class ElasticsearchSink
         DataSaveMode dataSaveMode = config.get(SinkConfig.DATA_SAVE_MODE);
 
         TablePath tablePath = TablePath.of("", 
catalogTable.getTableId().getTableName());
-        catalog.open();
         return Optional.of(
                 new DefaultSaveModeHandler(
                         schemaSaveMode, dataSaveMode, catalog, tablePath, 
null, null));
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
index 008ab799b9..65bccbdb89 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
@@ -130,7 +130,6 @@ public class IcebergSink
         }
         Catalog catalog =
                 
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), 
readonlyConfig);
-        catalog.open();
         return Optional.of(
                 new DefaultSaveModeHandler(
                         config.getSchemaSaveMode(),
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 1ec9ab8883..2ccfba19f2 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -198,7 +198,6 @@ public class JdbcSink
             if (catalogOptional.isPresent()) {
                 try {
                     Catalog catalog = catalogOptional.get();
-                    catalog.open();
                     FieldIdeEnum fieldIdeEnumEnum = 
config.get(JdbcOptions.FIELD_IDE);
                     String fieldIde =
                             fieldIdeEnumEnum == null
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
index c5b1b82bcc..2015be1973 100644
--- 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
@@ -103,7 +103,6 @@ public class MilvusSink
         SchemaSaveMode schemaSaveMode = 
config.get(MilvusSinkConfig.SCHEMA_SAVE_MODE);
         DataSaveMode dataSaveMode = 
config.get(MilvusSinkConfig.DATA_SAVE_MODE);
 
-        catalog.open();
         return Optional.of(
                 new DefaultSaveModeHandler(
                         schemaSaveMode,
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
index 23651994ad..fbf04a5038 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
@@ -144,7 +144,6 @@ public class PaimonSink
         }
         org.apache.seatunnel.api.table.catalog.Catalog catalog =
                 
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), 
readonlyConfig);
-        catalog.open();
         return Optional.of(
                 new PaimonSaveModeHandler(
                         this,
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index b9040f72d4..35c9ed9e37 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -79,7 +79,6 @@ public class StarRocksSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
                         sinkConfig.getPassword(),
                         sinkConfig.getJdbcUrl(),
                         sinkConfig.getSaveModeCreateTemplate());
-        catalog.open();
         return Optional.of(
                 new DefaultSaveModeHandler(
                         schemaSaveMode,
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 51586beaf0..f7ec6bd309 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -130,6 +130,7 @@ public class SinkExecuteProcessor
                 Optional<SaveModeHandler> saveModeHandler = 
saveModeSink.getSaveModeHandler();
                 if (saveModeHandler.isPresent()) {
                     try (SaveModeHandler handler = saveModeHandler.get()) {
+                        handler.open();
                         new SaveModeExecuteWrapper(handler).execute();
                     } catch (Exception e) {
                         throw new 
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index c713593821..b4f192bc61 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -131,6 +131,7 @@ public class SinkExecuteProcessor
                 Optional<SaveModeHandler> saveModeHandler = 
saveModeSink.getSaveModeHandler();
                 if (saveModeHandler.isPresent()) {
                     try (SaveModeHandler handler = saveModeHandler.get()) {
+                        handler.open();
                         new SaveModeExecuteWrapper(handler).execute();
                     } catch (Exception e) {
                         throw new 
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 0f3fc75096..20ca02b616 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -122,6 +122,7 @@ public class SinkExecuteProcessor
                 Optional<SaveModeHandler> saveModeHandler = 
saveModeSink.getSaveModeHandler();
                 if (saveModeHandler.isPresent()) {
                     try (SaveModeHandler handler = saveModeHandler.get()) {
+                        handler.open();
                         new SaveModeExecuteWrapper(handler).execute();
                     } catch (Exception e) {
                         throw new 
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 29ff90cfb8..0cca2f8d68 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -122,6 +122,7 @@ public class SinkExecuteProcessor
                 Optional<SaveModeHandler> saveModeHandler = 
saveModeSink.getSaveModeHandler();
                 if (saveModeHandler.isPresent()) {
                     try (SaveModeHandler handler = saveModeHandler.get()) {
+                        handler.open();
                         new SaveModeExecuteWrapper(handler).execute();
                     } catch (Exception e) {
                         throw new 
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
index f8550a615a..4ece65fda8 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.e2e.connector.doris;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -245,7 +246,9 @@ public class DorisCatalogIT extends AbstractDorisIT {
                         Thread.currentThread().getContextClassLoader(),
                         Collections.emptyList());
         SupportSaveMode sink = (SupportSaveMode) 
dorisSinkFactory.createSink(context).createSink();
-        sink.getSaveModeHandler().get().handleSaveMode();
+        SaveModeHandler handler = sink.getSaveModeHandler().get();
+        handler.open();
+        handler.handleSaveMode();
         CatalogTable createdTable = catalog.getTable(TablePath.of(fullName));
         Assertions.assertEquals(
                 upstreamTable.getTableSchema().getColumns().size(),
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
index e2c28bd447..61084819a2 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java
@@ -42,6 +42,9 @@ public class InMemorySaveModeHandler implements 
SaveModeHandler {
         this.catalogTable = catalogTable;
     }
 
+    @Override
+    public void open() {}
+
     @Override
     public void handleSchemaSaveMode() {
         log.info("handle schema savemode with table path: {}", 
catalogTable.getTablePath());
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index d02a76a4c5..cb7f118a6e 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -698,6 +698,7 @@ public class MultipleTableJobConfigParser {
                 Optional<SaveModeHandler> saveModeHandler = 
saveModeSink.getSaveModeHandler();
                 if (saveModeHandler.isPresent()) {
                     try (SaveModeHandler handler = saveModeHandler.get()) {
+                        handler.open();
                         new SaveModeExecuteWrapper(handler).execute();
                     } catch (Exception e) {
                         throw new 
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index f521c05492..d85cf607dd 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -372,6 +372,7 @@ public class JobMaster {
                     ((SupportSaveMode) sink).getSaveModeHandler();
             if (saveModeHandler.isPresent()) {
                 try (SaveModeHandler handler = saveModeHandler.get()) {
+                    handler.open();
                     new SaveModeExecuteWrapper(handler).execute();
                 } catch (Exception e) {
                     throw new 
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);

Reply via email to