This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cc5949988b [Improve][Core] Move MultiTableSink to seatunnel-api module
(#7243)
cc5949988b is described below
commit cc5949988b7a851c49d594389fd3b96dbe0f9508
Author: Jia Fan <[email protected]>
AuthorDate: Tue Jul 23 14:27:07 2024 +0800
[Improve][Core] Move MultiTableSink to seatunnel-api module (#7243)
* [Improve][Core] Move MultiTableSink to seatunnel-api module
* [Improve][Core] Move MultiTableSink to seatunnel-api module
---
.../sink}/multitablesink/MultiTableAggregatedCommitInfo.java | 2 +-
.../api/sink}/multitablesink/MultiTableCommitInfo.java | 2 +-
.../seatunnel/api/sink}/multitablesink/MultiTableSink.java | 6 ++++--
.../multitablesink/MultiTableSinkAggregatedCommitter.java | 2 +-
.../api/sink}/multitablesink/MultiTableSinkCommitter.java | 2 +-
.../api/sink}/multitablesink/MultiTableSinkFactory.java | 2 +-
.../api/sink}/multitablesink/MultiTableSinkWriter.java | 2 +-
.../seatunnel/api/sink}/multitablesink/MultiTableState.java | 2 +-
.../api/sink}/multitablesink/MultiTableWriterRunnable.java | 2 +-
.../seatunnel/api/sink}/multitablesink/SinkContextProxy.java | 2 +-
.../seatunnel/api/sink}/multitablesink/SinkIdentifier.java | 2 +-
.../org/apache/seatunnel/api/table/factory/FactoryUtil.java | 3 ++-
.../org/apache/seatunnel/engine/server/master/JobMaster.java | 11 +++--------
13 files changed, 19 insertions(+), 21 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableAggregatedCommitInfo.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableAggregatedCommitInfo.java
similarity index 93%
rename from
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableAggregatedCommitInfo.java
rename to
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableAggregatedCommitInfo.java
index 5d378140e9..585a8f4e06 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableAggregatedCommitInfo.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableAggregatedCommitInfo.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
import lombok.AllArgsConstructor;
import lombok.Getter;
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableCommitInfo.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableCommitInfo.java
similarity index 93%
rename from
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableCommitInfo.java
rename to
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableCommitInfo.java
index 21faf0c7ed..8b12fa07c5 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableCommitInfo.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableCommitInfo.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
import lombok.AllArgsConstructor;
import lombok.Getter;
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
similarity index 97%
rename from
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
rename to
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
index 7abb176117..bb04283ca6 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
@@ -28,6 +28,8 @@ import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import lombok.Getter;
+
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
@@ -44,7 +46,7 @@ public class MultiTableSink
MultiTableCommitInfo,
MultiTableAggregatedCommitInfo> {
- private final Map<String, SeaTunnelSink> sinks;
+ @Getter private final Map<String, SeaTunnelSink> sinks;
private final int replicaNum;
public MultiTableSink(MultiTableFactoryContext context) {
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java
similarity index 99%
rename from
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java
rename to
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java
index 31dd91f1ee..6ed04d871b 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkCommitter.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkCommitter.java
similarity index 98%
rename from
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkCommitter.java
rename to
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkCommitter.java
index ed52fafb00..113e269fd0 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkCommitter.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkCommitter.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
import org.apache.seatunnel.api.sink.SinkCommitter;
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkFactory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkFactory.java
similarity index 96%
rename from
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkFactory.java
rename to
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkFactory.java
index 00e1e1ab13..08db91b7c8 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkFactory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkFactory.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.connector.TableSink;
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
similarity index 99%
rename from
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
rename to
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
index 12163676d7..3c73435faf 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkWriter;
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableState.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableState.java
similarity index 93%
rename from
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableState.java
rename to
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableState.java
index 43f5d8bd99..ac7db893ba 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableState.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableState.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
import lombok.AllArgsConstructor;
import lombok.Getter;
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableWriterRunnable.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableWriterRunnable.java
similarity index 97%
rename from
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableWriterRunnable.java
rename to
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableWriterRunnable.java
index ce22e0e2e2..3026dc778b 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableWriterRunnable.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableWriterRunnable.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/SinkContextProxy.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkContextProxy.java
similarity index 95%
rename from
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/SinkContextProxy.java
rename to
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkContextProxy.java
index f7691ddedf..3a97bb27bc 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/SinkContextProxy.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkContextProxy.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.event.EventListener;
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/SinkIdentifier.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkIdentifier.java
similarity index 94%
rename from
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/SinkIdentifier.java
rename to
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkIdentifier.java
index 18f7484853..50eac7c0d9 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/SinkIdentifier.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkIdentifier.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.common.multitablesink;
+package org.apache.seatunnel.api.sink.multitablesink;
import lombok.EqualsAndHashCode;
import lombok.Getter;
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 668ff2a43c..79c0c18706 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.env.ParsingMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkFactory;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceOptions;
import org.apache.seatunnel.api.source.SourceSplit;
@@ -151,7 +152,7 @@ public final class FactoryUtil {
ClassLoader classLoader) {
try {
TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
factory =
- discoverFactory(classLoader, TableSinkFactory.class,
"MultiTableSink");
+ new MultiTableSinkFactory();
MultiTableFactoryContext context =
new MultiTableFactoryContext(options, classLoader, sinks);
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index e9928a018a..aa74460b05 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -26,9 +26,9 @@ import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.ExceptionUtils;
-import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
@@ -375,13 +375,8 @@ public class JobMaster {
throw new
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
}
}
- } else if (sink.getClass()
- .getName()
- .equals(
-
"org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSink"))
{
- // TODO we should not use class name to judge the sink type
- Map<String, SeaTunnelSink> sinks =
- (Map<String, SeaTunnelSink>)
ReflectionUtils.getField(sink, "sinks").get();
+ } else if (sink instanceof MultiTableSink) {
+ Map<String, SeaTunnelSink> sinks = ((MultiTableSink)
sink).getSinks();
for (SeaTunnelSink seaTunnelSink : sinks.values()) {
handleSaveMode(seaTunnelSink);
}