This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1eb81e7f88 [improve] neo4j options (#9164)
1eb81e7f88 is described below
commit 1eb81e7f88cfa147c3657d17168005a7c4d42e62
Author: Jarvis <[email protected]>
AuthorDate: Tue Apr 29 09:51:14 2025 +0800
[improve] neo4j options (#9164)
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 2 -
...eo4jCommonConfig.java => Neo4jBaseOptions.java} | 2 +-
.../seatunnel/neo4j/config/Neo4jQueryInfo.java | 20 ++++----
...{Neo4jSinkConfig.java => Neo4jSinkOptions.java} | 2 +-
.../seatunnel/neo4j/config/Neo4jSinkQueryInfo.java | 8 ++--
...4jSourceConfig.java => Neo4jSourceOptions.java} | 2 +-
.../connectors/seatunnel/neo4j/sink/Neo4jSink.java | 31 ++++--------
.../seatunnel/neo4j/sink/Neo4jSinkFactory.java | 43 +++++++++--------
.../seatunnel/neo4j/sink/Neo4jSinkWriter.java | 2 +-
.../seatunnel/neo4j/source/Neo4jSource.java | 55 ++++++----------------
.../seatunnel/neo4j/source/Neo4jSourceFactory.java | 49 ++++++++++++-------
11 files changed, 96 insertions(+), 120 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 0a42dfa8d7..d791edf008 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -194,7 +194,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("TypesenseSinkOptions");
whiteList.add("PulsarSinkOptions");
whiteList.add("SlsSinkOptions");
- whiteList.add("Neo4jSinkOptions");
whiteList.add("PulsarSourceOptions");
whiteList.add("MongodbSinkOptions");
whiteList.add("SlsSourceOptions");
@@ -205,7 +204,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("RocketMqSourceOptions");
whiteList.add("TablestoreSinkOptions");
whiteList.add("TableStoreDBSourceOptions");
- whiteList.add("Neo4jSourceOptions");
whiteList.add("SocketSourceOptions");
whiteList.add("PostgresIncrementalSourceOptions");
whiteList.add("SqlServerIncrementalSourceOptions");
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jCommonConfig.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jBaseOptions.java
similarity index 98%
rename from
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jCommonConfig.java
rename to
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jBaseOptions.java
index dcc7828cd2..b844d723bd 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jCommonConfig.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jBaseOptions.java
@@ -20,7 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.neo4j.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-public abstract class Neo4jCommonConfig {
+public abstract class Neo4jBaseOptions {
public static final String PLUGIN_NAME = "Neo4j";
public static final Long DEFAULT_MAX_TRANSACTION_RETRY_TIME = 30L;
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jQueryInfo.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jQueryInfo.java
index 9730bfa110..f3118d56ca 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jQueryInfo.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jQueryInfo.java
@@ -32,16 +32,16 @@ import lombok.Data;
import java.io.Serializable;
import java.net.URI;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_BEARER_TOKEN;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_KERBEROS_TICKET;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_MAX_CONNECTION_TIMEOUT;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_MAX_TRANSACTION_RETRY_TIME;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_NEO4J_URI;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_QUERY;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_USERNAME;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.PLUGIN_NAME;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.KEY_BEARER_TOKEN;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.KEY_DATABASE;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.KEY_KERBEROS_TICKET;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.KEY_MAX_CONNECTION_TIMEOUT;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.KEY_MAX_TRANSACTION_RETRY_TIME;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.KEY_NEO4J_URI;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.KEY_PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.KEY_QUERY;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.KEY_USERNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.PLUGIN_NAME;
/**
* Because Neo4jQueryInfo is one of the Neo4jSink's member variable, So
Neo4jQueryInfo need
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkOptions.java
similarity index 97%
rename from
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java
rename to
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkOptions.java
index 8d8c42b0e3..87e240436b 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkOptions.java
@@ -23,7 +23,7 @@ import
org.apache.seatunnel.connectors.seatunnel.neo4j.constants.SinkWriteMode;
import java.util.Map;
-public class Neo4jSinkConfig extends Neo4jCommonConfig {
+public class Neo4jSinkOptions extends Neo4jBaseOptions {
public static final Option<Map<String, String>> QUERY_PARAM_POSITION =
Options.key("queryParamPosition")
.mapType()
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
index 997f7cc8ce..674b96442a 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
@@ -31,10 +31,10 @@ import lombok.Setter;
import java.util.Map;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.PLUGIN_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.MAX_BATCH_SIZE;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.QUERY_PARAM_POSITION;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.WRITE_MODE;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.PLUGIN_NAME;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkOptions.MAX_BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkOptions.QUERY_PARAM_POSITION;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkOptions.WRITE_MODE;
@Getter
@Setter
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceConfig.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceOptions.java
similarity index 93%
rename from
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceConfig.java
rename to
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceOptions.java
index de29a71ce6..a919e178a7 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceOptions.java
@@ -17,4 +17,4 @@
package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
-public class Neo4jSourceConfig extends Neo4jCommonConfig {}
+public class Neo4jSourceOptions extends Neo4jBaseOptions {}
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
index c3af6c7a90..1b422663e3 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
@@ -17,52 +17,39 @@
package org.apache.seatunnel.connectors.seatunnel.neo4j.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkQueryInfo;
-import com.google.auto.service.AutoService;
-
import java.io.IOException;
import java.util.Optional;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.PLUGIN_NAME;
-
-@AutoService(SeaTunnelSink.class)
public class Neo4jSink implements SeaTunnelSink<SeaTunnelRow, Void, Void,
Void> {
- private SeaTunnelRowType rowType;
+ private CatalogTable catalogTable;
private Neo4jSinkQueryInfo neo4JSinkQueryInfo;
- @Override
- public String getPluginName() {
- return PLUGIN_NAME;
+ public Neo4jSink(CatalogTable catalogTable, Neo4jSinkQueryInfo
neo4JSinkQueryInfo) {
+ this.catalogTable = catalogTable;
+ this.neo4JSinkQueryInfo = neo4JSinkQueryInfo;
}
@Override
- public void prepare(Config config) throws PrepareFailException {
- this.neo4JSinkQueryInfo = new Neo4jSinkQueryInfo(config);
- }
-
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.rowType = seaTunnelRowType;
+ public String getPluginName() {
+ return Neo4jSinkOptions.PLUGIN_NAME;
}
@Override
public SinkWriter<SeaTunnelRow, Void, Void>
createWriter(SinkWriter.Context context)
throws IOException {
- return new Neo4jSinkWriter(neo4JSinkQueryInfo, rowType);
+ return new Neo4jSinkWriter(neo4JSinkQueryInfo,
catalogTable.getSeaTunnelRowType());
}
@Override
public Optional<CatalogTable> getWriteCatalogTable() {
- return SeaTunnelSink.super.getWriteCatalogTable();
+ return Optional.of(catalogTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkFactory.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkFactory.java
index 70c048dd92..f2623c32f5 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkFactory.java
@@ -18,41 +18,44 @@
package org.apache.seatunnel.connectors.seatunnel.neo4j.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkOptions;
+import
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkQueryInfo;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_BEARER_TOKEN;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_KERBEROS_TICKET;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_MAX_CONNECTION_TIMEOUT;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_MAX_TRANSACTION_RETRY_TIME;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_QUERY;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_USERNAME;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.PLUGIN_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_NEO4J_URI;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.QUERY_PARAM_POSITION;
-
@AutoService(Factory.class)
public class Neo4jSinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
- return PLUGIN_NAME;
+ return Neo4jSinkOptions.PLUGIN_NAME;
}
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(KEY_NEO4J_URI, KEY_DATABASE, KEY_QUERY,
QUERY_PARAM_POSITION)
+ .required(
+ Neo4jSinkOptions.KEY_NEO4J_URI,
+ Neo4jSinkOptions.KEY_DATABASE,
+ Neo4jSinkOptions.KEY_QUERY,
+ Neo4jSinkOptions.QUERY_PARAM_POSITION)
.optional(
- KEY_USERNAME,
- KEY_PASSWORD,
- KEY_BEARER_TOKEN,
- KEY_KERBEROS_TICKET,
- KEY_MAX_CONNECTION_TIMEOUT,
- KEY_MAX_TRANSACTION_RETRY_TIME)
+ Neo4jSinkOptions.KEY_USERNAME,
+ Neo4jSinkOptions.KEY_PASSWORD,
+ Neo4jSinkOptions.KEY_BEARER_TOKEN,
+ Neo4jSinkOptions.KEY_KERBEROS_TICKET,
+ Neo4jSinkOptions.KEY_MAX_CONNECTION_TIMEOUT,
+ Neo4jSinkOptions.KEY_MAX_TRANSACTION_RETRY_TIME)
.build();
}
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ Neo4jSinkQueryInfo neo4jSinkQueryInfo =
+ new Neo4jSinkQueryInfo(context.getOptions().toConfig());
+ return () -> new Neo4jSink(context.getCatalogTable(),
neo4jSinkQueryInfo);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
index ccd5707930..2fbe1ebe65 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
@@ -46,7 +46,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.PLUGIN_NAME;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jBaseOptions.PLUGIN_NAME;
@Slf4j
public class Neo4jSinkWriter implements SinkWriter<SeaTunnelRow, Void, Void> {
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
index a4e1f39a14..65fd0ecf4c 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
@@ -17,62 +17,37 @@
package org.apache.seatunnel.connectors.seatunnel.neo4j.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.options.ConnectorCommonOptions;
import org.apache.seatunnel.api.source.Boundedness;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportColumnProjection;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
-import
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceQueryInfo;
-import
org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;
-import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.List;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.PLUGIN_NAME;
+import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceOptions.PLUGIN_NAME;
-@AutoService(SeaTunnelSource.class)
public class Neo4jSource extends AbstractSingleSplitSource<SeaTunnelRow>
implements SupportColumnProjection {
- private Neo4jSourceQueryInfo neo4jSourceQueryInfo;
- private SeaTunnelRowType rowType;
+ private final CatalogTable catalogTable;
+ private final Neo4jSourceQueryInfo neo4jSourceQueryInfo;
+ private final SeaTunnelRowType rowType;
- @Override
- public String getPluginName() {
- return PLUGIN_NAME;
+ public Neo4jSource(CatalogTable catalogTable, Neo4jSourceQueryInfo
neo4jSourceQueryInfo) {
+ this.catalogTable = catalogTable;
+ this.neo4jSourceQueryInfo = neo4jSourceQueryInfo;
+ this.rowType = catalogTable.getSeaTunnelRowType();
}
@Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
-
- final CheckResult configCheck =
- CheckConfigUtil.checkAllExists(pluginConfig,
ConnectorCommonOptions.SCHEMA.key());
-
- if (!configCheck.isSuccess()) {
- throw new Neo4jConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- Neo4jSourceConfig.PLUGIN_NAME,
- PluginType.SOURCE,
- configCheck.getMsg()));
- }
-
- this.neo4jSourceQueryInfo = new Neo4jSourceQueryInfo(pluginConfig);
- this.rowType =
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
+ public String getPluginName() {
+ return PLUGIN_NAME;
}
@Override
@@ -81,8 +56,8 @@ public class Neo4jSource extends
AbstractSingleSplitSource<SeaTunnelRow>
}
@Override
- public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
- return this.rowType;
+ public List<CatalogTable> getProducedCatalogTables() {
+ return Collections.singletonList(catalogTable);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSourceFactory.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSourceFactory.java
index 3aa00cc30d..d7e924d76e 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSourceFactory.java
@@ -20,40 +20,41 @@ package
org.apache.seatunnel.connectors.seatunnel.neo4j.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.options.ConnectorCommonOptions;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceOptions;
+import
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceQueryInfo;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_BEARER_TOKEN;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_KERBEROS_TICKET;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_MAX_CONNECTION_TIMEOUT;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_MAX_TRANSACTION_RETRY_TIME;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_NEO4J_URI;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_QUERY;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_USERNAME;
-import static
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.PLUGIN_NAME;
+import java.io.Serializable;
@AutoService(Factory.class)
public class Neo4jSourceFactory implements TableSourceFactory {
@Override
public String factoryIdentifier() {
- return PLUGIN_NAME;
+ return Neo4jSourceOptions.PLUGIN_NAME;
}
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(KEY_NEO4J_URI, KEY_DATABASE, KEY_QUERY,
ConnectorCommonOptions.SCHEMA)
+ .required(
+ Neo4jSourceOptions.KEY_NEO4J_URI,
+ Neo4jSourceOptions.KEY_DATABASE,
+ Neo4jSourceOptions.KEY_QUERY,
+ ConnectorCommonOptions.SCHEMA)
.optional(
- KEY_USERNAME,
- KEY_PASSWORD,
- KEY_BEARER_TOKEN,
- KEY_KERBEROS_TICKET,
- KEY_MAX_CONNECTION_TIMEOUT,
- KEY_MAX_TRANSACTION_RETRY_TIME)
+ Neo4jSourceOptions.KEY_USERNAME,
+ Neo4jSourceOptions.KEY_PASSWORD,
+ Neo4jSourceOptions.KEY_BEARER_TOKEN,
+ Neo4jSourceOptions.KEY_KERBEROS_TICKET,
+ Neo4jSourceOptions.KEY_MAX_CONNECTION_TIMEOUT,
+ Neo4jSourceOptions.KEY_MAX_TRANSACTION_RETRY_TIME)
.build();
}
@@ -61,4 +62,16 @@ public class Neo4jSourceFactory implements
TableSourceFactory {
public Class<? extends SeaTunnelSource> getSourceClass() {
return Neo4jSource.class;
}
+
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ Neo4jSourceQueryInfo neo4jSourceQueryInfo =
+ new Neo4jSourceQueryInfo(context.getOptions().toConfig());
+ return () ->
+ (SeaTunnelSource<T, SplitT, StateT>)
+ new Neo4jSource(
+
CatalogTableUtil.buildWithConfig(context.getOptions()),
+ neo4jSourceQueryInfo);
+ }
}