This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new cc5949988b [Improve][Core] Move MultiTableSink to seatunnel-api module 
(#7243)
cc5949988b is described below

commit cc5949988b7a851c49d594389fd3b96dbe0f9508
Author: Jia Fan <[email protected]>
AuthorDate: Tue Jul 23 14:27:07 2024 +0800

    [Improve][Core] Move MultiTableSink to seatunnel-api module (#7243)
    
    * [Improve][Core] Move MultiTableSink to seatunnel-api module
    
    * [Improve][Core] Move MultiTableSink to seatunnel-api module
---
 .../sink}/multitablesink/MultiTableAggregatedCommitInfo.java  |  2 +-
 .../api/sink}/multitablesink/MultiTableCommitInfo.java        |  2 +-
 .../seatunnel/api/sink}/multitablesink/MultiTableSink.java    |  6 ++++--
 .../multitablesink/MultiTableSinkAggregatedCommitter.java     |  2 +-
 .../api/sink}/multitablesink/MultiTableSinkCommitter.java     |  2 +-
 .../api/sink}/multitablesink/MultiTableSinkFactory.java       |  2 +-
 .../api/sink}/multitablesink/MultiTableSinkWriter.java        |  2 +-
 .../seatunnel/api/sink}/multitablesink/MultiTableState.java   |  2 +-
 .../api/sink}/multitablesink/MultiTableWriterRunnable.java    |  2 +-
 .../seatunnel/api/sink}/multitablesink/SinkContextProxy.java  |  2 +-
 .../seatunnel/api/sink}/multitablesink/SinkIdentifier.java    |  2 +-
 .../org/apache/seatunnel/api/table/factory/FactoryUtil.java   |  3 ++-
 .../org/apache/seatunnel/engine/server/master/JobMaster.java  | 11 +++--------
 13 files changed, 19 insertions(+), 21 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableAggregatedCommitInfo.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableAggregatedCommitInfo.java
similarity index 93%
rename from 
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableAggregatedCommitInfo.java
rename to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableAggregatedCommitInfo.java
index 5d378140e9..585a8f4e06 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableAggregatedCommitInfo.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableAggregatedCommitInfo.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableCommitInfo.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableCommitInfo.java
similarity index 93%
rename from 
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableCommitInfo.java
rename to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableCommitInfo.java
index 21faf0c7ed..8b12fa07c5 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableCommitInfo.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableCommitInfo.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
similarity index 97%
rename from 
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
rename to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
index 7abb176117..bb04283ca6 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
 
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
@@ -28,6 +28,8 @@ import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
+import lombok.Getter;
+
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
@@ -44,7 +46,7 @@ public class MultiTableSink
                 MultiTableCommitInfo,
                 MultiTableAggregatedCommitInfo> {
 
-    private final Map<String, SeaTunnelSink> sinks;
+    @Getter private final Map<String, SeaTunnelSink> sinks;
     private final int replicaNum;
 
     public MultiTableSink(MultiTableFactoryContext context) {
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java
similarity index 99%
rename from 
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java
rename to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java
index 31dd91f1ee..6ed04d871b 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
 
 import org.apache.seatunnel.api.sink.MultiTableResourceManager;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkCommitter.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkCommitter.java
similarity index 98%
rename from 
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkCommitter.java
rename to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkCommitter.java
index ed52fafb00..113e269fd0 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkCommitter.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkCommitter.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
 
 import org.apache.seatunnel.api.sink.SinkCommitter;
 
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkFactory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkFactory.java
similarity index 96%
rename from 
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkFactory.java
rename to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkFactory.java
index 00e1e1ab13..08db91b7c8 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkFactory.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkFactory.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.connector.TableSink;
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
similarity index 99%
rename from 
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
rename to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
index 12163676d7..3c73435faf 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
 
 import org.apache.seatunnel.api.sink.MultiTableResourceManager;
 import org.apache.seatunnel.api.sink.SinkWriter;
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableState.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableState.java
similarity index 93%
rename from 
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableState.java
rename to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableState.java
index 43f5d8bd99..ac7db893ba 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableState.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableState.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableWriterRunnable.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableWriterRunnable.java
similarity index 97%
rename from 
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableWriterRunnable.java
rename to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableWriterRunnable.java
index ce22e0e2e2..3026dc778b 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableWriterRunnable.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableWriterRunnable.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
 
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/SinkContextProxy.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkContextProxy.java
similarity index 95%
rename from 
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/SinkContextProxy.java
rename to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkContextProxy.java
index f7691ddedf..3a97bb27bc 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/SinkContextProxy.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkContextProxy.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
 
 import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.event.EventListener;
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/SinkIdentifier.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkIdentifier.java
similarity index 94%
rename from 
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/SinkIdentifier.java
rename to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkIdentifier.java
index 18f7484853..50eac7c0d9 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/SinkIdentifier.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkIdentifier.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
 
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 668ff2a43c..79c0c18706 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -23,6 +23,7 @@ import 
org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.env.ParsingMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkFactory;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceOptions;
 import org.apache.seatunnel.api.source.SourceSplit;
@@ -151,7 +152,7 @@ public final class FactoryUtil {
                     ClassLoader classLoader) {
         try {
             TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT> 
factory =
-                    discoverFactory(classLoader, TableSinkFactory.class, 
"MultiTableSink");
+                    new MultiTableSinkFactory();
             MultiTableFactoryContext context =
                     new MultiTableFactoryContext(options, classLoader, sinks);
             
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index e9928a018a..aa74460b05 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -26,9 +26,9 @@ import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
-import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
@@ -375,13 +375,8 @@ public class JobMaster {
                     throw new 
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
                 }
             }
-        } else if (sink.getClass()
-                .getName()
-                .equals(
-                        
"org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSink"))
 {
-            // TODO we should not use class name to judge the sink type
-            Map<String, SeaTunnelSink> sinks =
-                    (Map<String, SeaTunnelSink>) 
ReflectionUtils.getField(sink, "sinks").get();
+        } else if (sink instanceof MultiTableSink) {
+            Map<String, SeaTunnelSink> sinks = ((MultiTableSink) 
sink).getSinks();
             for (SeaTunnelSink seaTunnelSink : sinks.values()) {
                 handleSaveMode(seaTunnelSink);
             }

Reply via email to