This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1eb81e7f88 [improve] neo4j options (#9164)
1eb81e7f88 is described below

commit 1eb81e7f88cfa147c3657d17168005a7c4d42e62
Author: Jarvis <[email protected]>
AuthorDate: Tue Apr 29 09:51:14 2025 +0800

    [improve] neo4j options (#9164)
---
 .../seatunnel/api/ConnectorOptionCheckTest.java    |  2 -
 ...eo4jCommonConfig.java => Neo4jBaseOptions.java} |  2 +-
 .../seatunnel/neo4j/config/Neo4jQueryInfo.java     | 20 ++++----
 ...{Neo4jSinkConfig.java => Neo4jSinkOptions.java} |  2 +-
 .../seatunnel/neo4j/config/Neo4jSinkQueryInfo.java |  8 ++--
 ...4jSourceConfig.java => Neo4jSourceOptions.java} |  2 +-
 .../connectors/seatunnel/neo4j/sink/Neo4jSink.java | 31 ++++--------
 .../seatunnel/neo4j/sink/Neo4jSinkFactory.java     | 43 +++++++++--------
 .../seatunnel/neo4j/sink/Neo4jSinkWriter.java      |  2 +-
 .../seatunnel/neo4j/source/Neo4jSource.java        | 55 ++++++----------------
 .../seatunnel/neo4j/source/Neo4jSourceFactory.java | 49 ++++++++++++-------
 11 files changed, 96 insertions(+), 120 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 0a42dfa8d7..d791edf008 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -194,7 +194,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("TypesenseSinkOptions");
         whiteList.add("PulsarSinkOptions");
         whiteList.add("SlsSinkOptions");
-        whiteList.add("Neo4jSinkOptions");
         whiteList.add("PulsarSourceOptions");
         whiteList.add("MongodbSinkOptions");
         whiteList.add("SlsSourceOptions");
@@ -205,7 +204,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("RocketMqSourceOptions");
         whiteList.add("TablestoreSinkOptions");
         whiteList.add("TableStoreDBSourceOptions");
-        whiteList.add("Neo4jSourceOptions");
         whiteList.add("SocketSourceOptions");
         whiteList.add("PostgresIncrementalSourceOptions");
         whiteList.add("SqlServerIncrementalSourceOptions");
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jCommonConfig.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jBaseOptions.java
similarity index 98%
rename from 
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jCommonConfig.java
rename to 
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jBaseOptions.java
index dcc7828cd2..b844d723bd 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jCommonConfig.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jBaseOptions.java
@@ -20,7 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.neo4j.config;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
-public abstract class Neo4jCommonConfig {
+public abstract class Neo4jBaseOptions {
 
     public static final String PLUGIN_NAME = "Neo4j";
     public static final Long DEFAULT_MAX_TRANSACTION_RETRY_TIME = 30L;
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jQueryInfo.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jQueryInfo.java
index 9730bfa110..f3118d56ca 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jQueryInfo.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jQueryInfo.java
@@ -32,16 +32,16 @@ import lombok.Data;
 import java.io.Serializable;
 import java.net.URI;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_BEARER_TOKEN;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_KERBEROS_TICKET;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_MAX_CONNECTION_TIMEOUT;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_MAX_TRANSACTION_RETRY_TIME;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_NEO4J_URI;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_QUERY;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_USERNAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.PLUGIN_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.KEY_BEARER_TOKEN;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.KEY_DATABASE;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.KEY_KERBEROS_TICKET;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.KEY_MAX_CONNECTION_TIMEOUT;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.KEY_MAX_TRANSACTION_RETRY_TIME;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.KEY_NEO4J_URI;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.KEY_PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.KEY_QUERY;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.KEY_USERNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.PLUGIN_NAME;
 
 /**
  * Because Neo4jQueryInfo is one of the Neo4jSink's member variable, So 
Neo4jQueryInfo need
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkOptions.java
similarity index 97%
rename from 
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java
rename to 
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkOptions.java
index 8d8c42b0e3..87e240436b 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkOptions.java
@@ -23,7 +23,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.neo4j.constants.SinkWriteMode;
 
 import java.util.Map;
 
-public class Neo4jSinkConfig extends Neo4jCommonConfig {
+public class Neo4jSinkOptions extends Neo4jBaseOptions {
     public static final Option<Map<String, String>> QUERY_PARAM_POSITION =
             Options.key("queryParamPosition")
                     .mapType()
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
index 997f7cc8ce..674b96442a 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
@@ -31,10 +31,10 @@ import lombok.Setter;
 
 import java.util.Map;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.PLUGIN_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.MAX_BATCH_SIZE;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.QUERY_PARAM_POSITION;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.WRITE_MODE;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.PLUGIN_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkOptions.MAX_BATCH_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkOptions.QUERY_PARAM_POSITION;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkOptions.WRITE_MODE;
 
 @Getter
 @Setter
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceConfig.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceOptions.java
similarity index 93%
rename from 
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceConfig.java
rename to 
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceOptions.java
index de29a71ce6..a919e178a7 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceOptions.java
@@ -17,4 +17,4 @@
 
 package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
 
-public class Neo4jSourceConfig extends Neo4jCommonConfig {}
+public class Neo4jSourceOptions extends Neo4jBaseOptions {}
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
index c3af6c7a90..1b422663e3 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
@@ -17,52 +17,39 @@
 
 package org.apache.seatunnel.connectors.seatunnel.neo4j.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkQueryInfo;
 
-import com.google.auto.service.AutoService;
-
 import java.io.IOException;
 import java.util.Optional;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.PLUGIN_NAME;
-
-@AutoService(SeaTunnelSink.class)
 public class Neo4jSink implements SeaTunnelSink<SeaTunnelRow, Void, Void, 
Void> {
 
-    private SeaTunnelRowType rowType;
+    private CatalogTable catalogTable;
     private Neo4jSinkQueryInfo neo4JSinkQueryInfo;
 
-    @Override
-    public String getPluginName() {
-        return PLUGIN_NAME;
+    public Neo4jSink(CatalogTable catalogTable, Neo4jSinkQueryInfo 
neo4JSinkQueryInfo) {
+        this.catalogTable = catalogTable;
+        this.neo4JSinkQueryInfo = neo4JSinkQueryInfo;
     }
 
     @Override
-    public void prepare(Config config) throws PrepareFailException {
-        this.neo4JSinkQueryInfo = new Neo4jSinkQueryInfo(config);
-    }
-
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.rowType = seaTunnelRowType;
+    public String getPluginName() {
+        return Neo4jSinkOptions.PLUGIN_NAME;
     }
 
     @Override
     public SinkWriter<SeaTunnelRow, Void, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
-        return new Neo4jSinkWriter(neo4JSinkQueryInfo, rowType);
+        return new Neo4jSinkWriter(neo4JSinkQueryInfo, 
catalogTable.getSeaTunnelRowType());
     }
 
     @Override
     public Optional<CatalogTable> getWriteCatalogTable() {
-        return SeaTunnelSink.super.getWriteCatalogTable();
+        return Optional.of(catalogTable);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkFactory.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkFactory.java
index 70c048dd92..f2623c32f5 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkFactory.java
@@ -18,41 +18,44 @@
 package org.apache.seatunnel.connectors.seatunnel.neo4j.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkQueryInfo;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_BEARER_TOKEN;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_KERBEROS_TICKET;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_MAX_CONNECTION_TIMEOUT;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_MAX_TRANSACTION_RETRY_TIME;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_QUERY;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_USERNAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.PLUGIN_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_NEO4J_URI;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.QUERY_PARAM_POSITION;
-
 @AutoService(Factory.class)
 public class Neo4jSinkFactory implements TableSinkFactory {
     @Override
     public String factoryIdentifier() {
-        return PLUGIN_NAME;
+        return Neo4jSinkOptions.PLUGIN_NAME;
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(KEY_NEO4J_URI, KEY_DATABASE, KEY_QUERY, 
QUERY_PARAM_POSITION)
+                .required(
+                        Neo4jSinkOptions.KEY_NEO4J_URI,
+                        Neo4jSinkOptions.KEY_DATABASE,
+                        Neo4jSinkOptions.KEY_QUERY,
+                        Neo4jSinkOptions.QUERY_PARAM_POSITION)
                 .optional(
-                        KEY_USERNAME,
-                        KEY_PASSWORD,
-                        KEY_BEARER_TOKEN,
-                        KEY_KERBEROS_TICKET,
-                        KEY_MAX_CONNECTION_TIMEOUT,
-                        KEY_MAX_TRANSACTION_RETRY_TIME)
+                        Neo4jSinkOptions.KEY_USERNAME,
+                        Neo4jSinkOptions.KEY_PASSWORD,
+                        Neo4jSinkOptions.KEY_BEARER_TOKEN,
+                        Neo4jSinkOptions.KEY_KERBEROS_TICKET,
+                        Neo4jSinkOptions.KEY_MAX_CONNECTION_TIMEOUT,
+                        Neo4jSinkOptions.KEY_MAX_TRANSACTION_RETRY_TIME)
                 .build();
     }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        Neo4jSinkQueryInfo neo4jSinkQueryInfo =
+                new Neo4jSinkQueryInfo(context.getOptions().toConfig());
+        return () -> new Neo4jSink(context.getCatalogTable(), 
neo4jSinkQueryInfo);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
index ccd5707930..2fbe1ebe65 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
@@ -46,7 +46,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.PLUGIN_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.PLUGIN_NAME;
 
 @Slf4j
 public class Neo4jSinkWriter implements SinkWriter<SeaTunnelRow, Void, Void> {
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
index a4e1f39a14..65fd0ecf4c 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
@@ -17,62 +17,37 @@
 
 package org.apache.seatunnel.connectors.seatunnel.neo4j.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.options.ConnectorCommonOptions;
 import org.apache.seatunnel.api.source.Boundedness;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SupportColumnProjection;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
-import 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceQueryInfo;
-import 
org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;
 
-import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.List;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.PLUGIN_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceOptions.PLUGIN_NAME;
 
-@AutoService(SeaTunnelSource.class)
 public class Neo4jSource extends AbstractSingleSplitSource<SeaTunnelRow>
         implements SupportColumnProjection {
 
-    private Neo4jSourceQueryInfo neo4jSourceQueryInfo;
-    private SeaTunnelRowType rowType;
+    private final CatalogTable catalogTable;
+    private final Neo4jSourceQueryInfo neo4jSourceQueryInfo;
+    private final SeaTunnelRowType rowType;
 
-    @Override
-    public String getPluginName() {
-        return PLUGIN_NAME;
+    public Neo4jSource(CatalogTable catalogTable, Neo4jSourceQueryInfo 
neo4jSourceQueryInfo) {
+        this.catalogTable = catalogTable;
+        this.neo4jSourceQueryInfo = neo4jSourceQueryInfo;
+        this.rowType = catalogTable.getSeaTunnelRowType();
     }
 
     @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-
-        final CheckResult configCheck =
-                CheckConfigUtil.checkAllExists(pluginConfig, 
ConnectorCommonOptions.SCHEMA.key());
-
-        if (!configCheck.isSuccess()) {
-            throw new Neo4jConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            Neo4jSourceConfig.PLUGIN_NAME,
-                            PluginType.SOURCE,
-                            configCheck.getMsg()));
-        }
-
-        this.neo4jSourceQueryInfo = new Neo4jSourceQueryInfo(pluginConfig);
-        this.rowType = 
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
+    public String getPluginName() {
+        return PLUGIN_NAME;
     }
 
     @Override
@@ -81,8 +56,8 @@ public class Neo4jSource extends 
AbstractSingleSplitSource<SeaTunnelRow>
     }
 
     @Override
-    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
-        return this.rowType;
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Collections.singletonList(catalogTable);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSourceFactory.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSourceFactory.java
index 3aa00cc30d..d7e924d76e 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSourceFactory.java
@@ -20,40 +20,41 @@ package 
org.apache.seatunnel.connectors.seatunnel.neo4j.source;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.options.ConnectorCommonOptions;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceQueryInfo;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_BEARER_TOKEN;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_KERBEROS_TICKET;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_MAX_CONNECTION_TIMEOUT;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_MAX_TRANSACTION_RETRY_TIME;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_NEO4J_URI;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_QUERY;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_USERNAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.PLUGIN_NAME;
+import java.io.Serializable;
 
 @AutoService(Factory.class)
 public class Neo4jSourceFactory implements TableSourceFactory {
     @Override
     public String factoryIdentifier() {
-        return PLUGIN_NAME;
+        return Neo4jSourceOptions.PLUGIN_NAME;
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(KEY_NEO4J_URI, KEY_DATABASE, KEY_QUERY, 
ConnectorCommonOptions.SCHEMA)
+                .required(
+                        Neo4jSourceOptions.KEY_NEO4J_URI,
+                        Neo4jSourceOptions.KEY_DATABASE,
+                        Neo4jSourceOptions.KEY_QUERY,
+                        ConnectorCommonOptions.SCHEMA)
                 .optional(
-                        KEY_USERNAME,
-                        KEY_PASSWORD,
-                        KEY_BEARER_TOKEN,
-                        KEY_KERBEROS_TICKET,
-                        KEY_MAX_CONNECTION_TIMEOUT,
-                        KEY_MAX_TRANSACTION_RETRY_TIME)
+                        Neo4jSourceOptions.KEY_USERNAME,
+                        Neo4jSourceOptions.KEY_PASSWORD,
+                        Neo4jSourceOptions.KEY_BEARER_TOKEN,
+                        Neo4jSourceOptions.KEY_KERBEROS_TICKET,
+                        Neo4jSourceOptions.KEY_MAX_CONNECTION_TIMEOUT,
+                        Neo4jSourceOptions.KEY_MAX_TRANSACTION_RETRY_TIME)
                 .build();
     }
 
@@ -61,4 +62,16 @@ public class Neo4jSourceFactory implements 
TableSourceFactory {
     public Class<? extends SeaTunnelSource> getSourceClass() {
         return Neo4jSource.class;
     }
+
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        Neo4jSourceQueryInfo neo4jSourceQueryInfo =
+                new Neo4jSourceQueryInfo(context.getOptions().toConfig());
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>)
+                        new Neo4jSource(
+                                
CatalogTableUtil.buildWithConfig(context.getOptions()),
+                                neo4jSourceQueryInfo);
+    }
 }

Reply via email to