This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 580276a8b [Improve][connector-V2-Neo4j]Supports neo4j sink batch write 
and update docs (#4841)
580276a8b is described below

commit 580276a8bd05a0d7ec0293ec7aee54c967b17e86
Author: FuYouJ <[email protected]>
AuthorDate: Tue Jun 13 23:25:15 2023 +0800

    [Improve][connector-V2-Neo4j]Supports neo4j sink batch write and update 
docs (#4841)
---
 docs/en/connector-v2/sink/Neo4j.md                 |  69 ++++++++++---
 release-note.md                                    |   1 +
 .../seatunnel/neo4j/config/Neo4jQueryInfo.java     | 107 +++++++++++++++++++++
 .../seatunnel/neo4j/config/Neo4jSinkConfig.java    |  13 +++
 .../seatunnel/neo4j/config/Neo4jSinkQueryInfo.java |  82 +++++++++++++++-
 .../neo4j/config/Neo4jSourceQueryInfo.java         |  11 ++-
 .../CypherEnum.java}                               |  18 ++--
 .../SinkWriteMode.java}                            |   7 +-
 .../Neo4jConnectorErrorCode.java}                  |  25 +++--
 .../neo4j/internal/SeatunnelRowNeo4jValue.java     |  54 +++++++++++
 .../connectors/seatunnel/neo4j/sink/Neo4jSink.java | 100 +------------------
 .../seatunnel/neo4j/sink/Neo4jSinkWriter.java      |  88 +++++++++++++++--
 .../seatunnel/neo4j/source/Neo4jSource.java        |  88 +----------------
 .../seatunnel/e2e/connector/neo4j/Neo4jIT.java     |  35 +++++++
 .../resources/neo4j/fake_to_neo4j_batch_write.conf |  62 ++++++++++++
 15 files changed, 541 insertions(+), 219 deletions(-)

diff --git a/docs/en/connector-v2/sink/Neo4j.md 
b/docs/en/connector-v2/sink/Neo4j.md
index 8cfe35f7d..15e88646d 100644
--- a/docs/en/connector-v2/sink/Neo4j.md
+++ b/docs/en/connector-v2/sink/Neo4j.md
@@ -14,19 +14,21 @@ Write data to Neo4j.
 
 ## Options
 
-|            name            |  type  | required | default value |
-|----------------------------|--------|----------|---------------|
-| uri                        | String | Yes      | -             |
-| username                   | String | No       | -             |
-| password                   | String | No       | -             |
-| bearer_token               | String | No       | -             |
-| kerberos_ticket            | String | No       | -             |
-| database                   | String | Yes      | -             |
-| query                      | String | Yes      | -             |
-| queryParamPosition         | Object | Yes      | -             |
-| max_transaction_retry_time | Long   | No       | 30            |
-| max_connection_timeout     | Long   | No       | 30            |
-| common-options             | config | no       | -             |
+|            name            |  type   | required | default value |
+|----------------------------|---------|----------|---------------|
+| uri                        | String  | Yes      | -             |
+| username                   | String  | No       | -             |
+| password                   | String  | No       | -             |
+| max_batch_size             | Integer | No       | -             |
+| write_mode                 | String  | No       | OneByOne      |
+| bearer_token               | String  | No       | -             |
+| kerberos_ticket            | String  | No       | -             |
+| database                   | String  | Yes      | -             |
+| query                      | String  | Yes      | -             |
+| queryParamPosition         | Object  | Yes      | -             |
+| max_transaction_retry_time | Long    | No       | 30            |
+| max_connection_timeout     | Long    | No       | 30            |
+| common-options             | config  | no       | -             |
 
 ### uri [string]
 
@@ -40,6 +42,20 @@ username of the Neo4j
 
 password of the Neo4j. required if `username` is provided
 
+### max_batch_size[Integer]
+
+max_batch_size refers to the maximum number of data entries that can be 
written in a single transaction when writing to a database.
+
+### write_mode
+
+The default value is oneByOne, or set it to "Batch" if you want to have the 
ability to write in batches
+
+```cypher
+unwind $ttt as row create (n:Label) set n.name = row.name,n.age = rw.age
+```
+
+"ttt" represents a batch of data.,"ttt" can be any arbitrary string as long as 
it matches the configured "batch_data_variable".
+
 ### bearer_token [string]
 
 base64 encoded bearer token of the Neo4j. for Auth.
@@ -76,7 +92,7 @@ The maximum amount of time to wait for a TCP connection to be 
established (secon
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details
 
-## Example
+## WriteOneByOneExample
 
 ```
 sink {
@@ -98,9 +114,34 @@ sink {
 }
 ```
 
+## WriteBatchExample
+> The unwind keyword provided by cypher supports batch writing, and the 
default variable for a batch of data is batch. If you write a batch write 
statement, then you should declare cypher:unwind $batch as row to do someting
+```
+sink {
+  Neo4j {
+    uri = "bolt://localhost:7687"
+    username = "neo4j"
+    password = "neo4j"
+    database = "neo4j"
+    max_batch_size = 1000
+    write_mode = "BATCH"
+
+    max_transaction_retry_time = 3
+    max_connection_timeout = 10
+
+    query = "unwind $batch as row  create(n:MyLabel) set n.name = 
row.name,n.age = row.age"
+
+  }
+}
+```
+
 ## Changelog
 
 ### 2.2.0-beta 2022-09-26
 
 - Add Neo4j Sink Connector
 
+### issue ##4835
+
+- Sink supports batch write
+
diff --git a/release-note.md b/release-note.md
index 858e38c97..80f1479af 100644
--- a/release-note.md
+++ b/release-note.md
@@ -92,6 +92,7 @@
 - [Connector-V2] [Doris] Add a jobId to the doris label to distinguish between 
tasks (#4839) (#4853)
 - [Connector-v2] [Mongodb]Refactor mongodb connector (#4620)
 - [Connector-v2] [Jdbc] Populate primary key when jdbc sink is created using 
CatalogTable (#4755)
+- [Connector-v2] [Neo4j] Supports neo4j sink batch write mode (#4835)
 - [Transform-V2] Optimize SQL Transform package and Fix Spark type conversion 
bug of transform (#4490)
 
 ### CI
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jQueryInfo.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jQueryInfo.java
index 4e56da64b..9730bfa11 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jQueryInfo.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jQueryInfo.java
@@ -17,9 +17,31 @@
 
 package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
 
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import 
org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;
+
+import org.neo4j.driver.AuthTokens;
+
 import lombok.Data;
 
 import java.io.Serializable;
+import java.net.URI;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_BEARER_TOKEN;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_DATABASE;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_KERBEROS_TICKET;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_MAX_CONNECTION_TIMEOUT;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_MAX_TRANSACTION_RETRY_TIME;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_NEO4J_URI;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_QUERY;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_USERNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.PLUGIN_NAME;
 
 /**
  * Because Neo4jQueryInfo is one of the Neo4jSink's member variable, So 
Neo4jQueryInfo need
@@ -29,4 +51,89 @@ import java.io.Serializable;
 public abstract class Neo4jQueryInfo implements Serializable {
     protected DriverBuilder driverBuilder;
     protected String query;
+
+    protected PluginType pluginType;
+
+    public Neo4jQueryInfo(Config config, PluginType pluginType) {
+        this.pluginType = pluginType;
+        this.driverBuilder = prepareDriver(config, pluginType);
+        this.query = prepareQuery(config, pluginType);
+    }
+
+    // which is identical to the prepareDriver methods of the source and sink.
+    // the only difference is the pluginType mentioned in the error messages.
+    // so move code to here
+    protected DriverBuilder prepareDriver(Config config, PluginType 
pluginType) {
+        final CheckResult uriConfigCheck =
+                CheckConfigUtil.checkAllExists(config, KEY_NEO4J_URI.key(), 
KEY_DATABASE.key());
+        final CheckResult authConfigCheck =
+                CheckConfigUtil.checkAtLeastOneExists(
+                        config,
+                        KEY_USERNAME.key(),
+                        KEY_BEARER_TOKEN.key(),
+                        KEY_KERBEROS_TICKET.key());
+        final CheckResult mergedConfigCheck =
+                CheckConfigUtil.mergeCheckResults(uriConfigCheck, 
authConfigCheck);
+        if (!mergedConfigCheck.isSuccess()) {
+            throw new Neo4jConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format(
+                            "PluginName: %s, PluginType: %s, Message: %s",
+                            PLUGIN_NAME, pluginType, 
mergedConfigCheck.getMsg()));
+        }
+
+        final URI uri = URI.create(config.getString(KEY_NEO4J_URI.key()));
+
+        final DriverBuilder driverBuilder = DriverBuilder.create(uri);
+
+        if (config.hasPath(KEY_USERNAME.key())) {
+            final CheckResult pwParamCheck =
+                    CheckConfigUtil.checkAllExists(config, KEY_PASSWORD.key());
+            if (!pwParamCheck.isSuccess()) {
+                throw new Neo4jConnectorException(
+                        SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                        String.format(
+                                "PluginName: %s, PluginType: %s, Message: %s",
+                                PLUGIN_NAME, pluginType, 
pwParamCheck.getMsg()));
+            }
+            final String username = config.getString(KEY_USERNAME.key());
+            final String password = config.getString(KEY_PASSWORD.key());
+
+            driverBuilder.setUsername(username);
+            driverBuilder.setPassword(password);
+        } else if (config.hasPath(KEY_BEARER_TOKEN.key())) {
+            final String bearerToken = 
config.getString(KEY_BEARER_TOKEN.key());
+            AuthTokens.bearer(bearerToken);
+            driverBuilder.setBearerToken(bearerToken);
+        } else {
+            final String kerberosTicket = 
config.getString(KEY_KERBEROS_TICKET.key());
+            AuthTokens.kerberos(kerberosTicket);
+            driverBuilder.setBearerToken(kerberosTicket);
+        }
+
+        driverBuilder.setDatabase(config.getString(KEY_DATABASE.key()));
+
+        if (config.hasPath(KEY_MAX_CONNECTION_TIMEOUT.key())) {
+            driverBuilder.setMaxConnectionTimeoutSeconds(
+                    config.getLong(KEY_MAX_CONNECTION_TIMEOUT.key()));
+        }
+        if (config.hasPath(KEY_MAX_TRANSACTION_RETRY_TIME.key())) {
+            driverBuilder.setMaxTransactionRetryTimeSeconds(
+                    config.getLong(KEY_MAX_TRANSACTION_RETRY_TIME.key()));
+        }
+
+        return driverBuilder;
+    }
+
+    private String prepareQuery(Config config, PluginType pluginType) {
+        CheckResult queryConfigCheck = CheckConfigUtil.checkAllExists(config, 
KEY_QUERY.key());
+        if (!queryConfigCheck.isSuccess()) {
+            throw new Neo4jConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format(
+                            "PluginName: %s, PluginType: %s, Message: %s",
+                            PLUGIN_NAME, pluginType, 
queryConfigCheck.getMsg()));
+        }
+        return config.getString(KEY_QUERY.key());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java
index 74726b558..8d8c42b0e 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.neo4j.config;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.constants.SinkWriteMode;
 
 import java.util.Map;
 
@@ -29,4 +30,16 @@ public class Neo4jSinkConfig extends Neo4jCommonConfig {
                     .noDefaultValue()
                     .withDescription(
                             "position mapping information for query 
parameters. key name is parameter placeholder name. associated value is 
position of field in input data row.");
+
+    public static final Option<Integer> MAX_BATCH_SIZE =
+            Options.key("max_batch_size")
+                    .intType()
+                    .defaultValue(500)
+                    .withDescription("neo4j write max batch size");
+    public static final Option<SinkWriteMode> WRITE_MODE =
+            Options.key("write_mode")
+                    .enumType(SinkWriteMode.class)
+                    .defaultValue(SinkWriteMode.ONE_BY_ONE)
+                    .withDescription(
+                            "The write mode on the sink end is oneByOne by 
default in order to maintain compatibility with previous code.");
 }
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
index 44c0c7294..997f7cc8c 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
@@ -17,11 +17,89 @@
 
 package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
 
-import lombok.Data;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.constants.SinkWriteMode;
+import 
org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;
+
+import lombok.Getter;
+import lombok.Setter;
 
 import java.util.Map;
 
-@Data
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.PLUGIN_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.MAX_BATCH_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.QUERY_PARAM_POSITION;
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.WRITE_MODE;
+
+@Getter
+@Setter
 public class Neo4jSinkQueryInfo extends Neo4jQueryInfo {
+
     private Map<String, Object> queryParamPosition;
+    private Integer maxBatchSize;
+
+    private SinkWriteMode writeMode;
+
+    public boolean batchMode() {
+        return SinkWriteMode.BATCH.equals(writeMode);
+    }
+
+    public Neo4jSinkQueryInfo(Config config) {
+        super(config, PluginType.SINK);
+
+        this.writeMode = prepareWriteMode(config);
+
+        if (SinkWriteMode.BATCH.equals(writeMode)) {
+            prepareBatchWriteConfig(config);
+        } else {
+            prepareOneByOneConfig(config);
+        }
+    }
+
+    private void prepareOneByOneConfig(Config config) {
+
+        CheckResult queryConfigCheck =
+                CheckConfigUtil.checkAllExists(config, 
QUERY_PARAM_POSITION.key());
+
+        if (!queryConfigCheck.isSuccess()) {
+            throw new Neo4jConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format(
+                            "PluginName: %s, PluginType: %s, Message: %s",
+                            PLUGIN_NAME, PluginType.SINK, 
queryConfigCheck.getMsg()));
+        }
+
+        // set queryParamPosition
+        this.queryParamPosition = 
config.getObject(QUERY_PARAM_POSITION.key()).unwrapped();
+    }
+
+    private void prepareBatchWriteConfig(Config config) {
+
+        // batch size
+        if (config.hasPath(MAX_BATCH_SIZE.key())) {
+            int batchSize = config.getInt(MAX_BATCH_SIZE.key());
+            if (batchSize <= 0) {
+                throw new Neo4jConnectorException(
+                        SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                        String.format(
+                                "PluginName: %s, PluginType: %s, Message: %s",
+                                PLUGIN_NAME, PluginType.SINK, "maxBatchSize 
must greater than 0"));
+            }
+            this.maxBatchSize = batchSize;
+        } else {
+            this.maxBatchSize = MAX_BATCH_SIZE.defaultValue();
+        }
+    }
+
+    private SinkWriteMode prepareWriteMode(Config config) {
+        if (config.hasPath(WRITE_MODE.key())) {
+            return config.getEnum(SinkWriteMode.class, WRITE_MODE.key());
+        }
+        return WRITE_MODE.defaultValue();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceQueryInfo.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceQueryInfo.java
index 8288ad01a..cd98f54f9 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceQueryInfo.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceQueryInfo.java
@@ -17,4 +17,13 @@
 
 package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
 
-public class Neo4jSourceQueryInfo extends Neo4jQueryInfo {}
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.common.constants.PluginType;
+
+public class Neo4jSourceQueryInfo extends Neo4jQueryInfo {
+
+    public Neo4jSourceQueryInfo(Config pluginConfig) {
+        super(pluginConfig, PluginType.SOURCE);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/constants/CypherEnum.java
similarity index 65%
copy from 
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
copy to 
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/constants/CypherEnum.java
index 44c0c7294..01c3cf4ce 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/constants/CypherEnum.java
@@ -15,13 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
+package org.apache.seatunnel.connectors.seatunnel.neo4j.constants;
 
-import lombok.Data;
+public enum CypherEnum {
+    BATCH("batch", "a variable in cypher that represents a batch of data");
+    private final String value;
+    private final String description;
 
-import java.util.Map;
+    CypherEnum(String value, String description) {
+        this.value = value;
+        this.description = description;
+    }
 
-@Data
-public class Neo4jSinkQueryInfo extends Neo4jQueryInfo {
-    private Map<String, Object> queryParamPosition;
+    public String getValue() {
+        return value;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceQueryInfo.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/constants/SinkWriteMode.java
similarity index 86%
copy from 
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceQueryInfo.java
copy to 
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/constants/SinkWriteMode.java
index 8288ad01a..4975fb113 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceQueryInfo.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/constants/SinkWriteMode.java
@@ -15,6 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
+package org.apache.seatunnel.connectors.seatunnel.neo4j.constants;
 
-public class Neo4jSourceQueryInfo extends Neo4jQueryInfo {}
+public enum SinkWriteMode {
+    ONE_BY_ONE,
+    BATCH
+}
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/exception/Neo4jConnectorErrorCode.java
similarity index 56%
copy from 
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
copy to 
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/exception/Neo4jConnectorErrorCode.java
index 44c0c7294..93f83131e 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkQueryInfo.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/exception/Neo4jConnectorErrorCode.java
@@ -14,14 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.seatunnel.connectors.seatunnel.neo4j.exception;
 
-package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
 
-import lombok.Data;
+public enum Neo4jConnectorErrorCode implements SeaTunnelErrorCode {
+    DATE_BASE_ERROR("NEO4J-01", "Neo4j Database Error");
+    private final String code;
+    private final String description;
 
-import java.util.Map;
+    Neo4jConnectorErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
 
-@Data
-public class Neo4jSinkQueryInfo extends Neo4jQueryInfo {
-    private Map<String, Object> queryParamPosition;
+    @Override
+    public String getCode() {
+        return code;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/internal/SeatunnelRowNeo4jValue.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/internal/SeatunnelRowNeo4jValue.java
new file mode 100644
index 000000000..667239ea2
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/internal/SeatunnelRowNeo4jValue.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.neo4j.internal;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.neo4j.driver.Value;
+import org.neo4j.driver.Values;
+import org.neo4j.driver.internal.AsValue;
+import org.neo4j.driver.internal.util.Iterables;
+import org.neo4j.driver.internal.value.MapValue;
+
+import java.util.Map;
+
+/**
+ * This class includes the seatunnelRow and implements the 
neo4j.driver.internal.AsValue interface.
+ * This class will be able to convert to neo4j.driver.Value quickly without 
any extra effort.
+ */
+public class SeatunnelRowNeo4jValue implements AsValue {
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final SeaTunnelRow seaTunnelRow;
+
+    public SeatunnelRowNeo4jValue(SeaTunnelRowType seaTunnelRowType, 
SeaTunnelRow seaTunnelRow) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.seaTunnelRow = seaTunnelRow;
+    }
+
+    @Override
+    public Value asValue() {
+        int length = seaTunnelRowType.getTotalFields();
+        Map<String, Value> valueMap = Iterables.newHashMapWithSize(length);
+        for (int i = 0; i < length; i++) {
+            String name = seaTunnelRowType.getFieldName(i);
+            Value value = Values.value(seaTunnelRow.getField(i));
+            valueMap.put(name, value);
+        }
+        return new MapValue(valueMap);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
index adba115ef..4ab0070d7 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
@@ -20,43 +20,24 @@ package 
org.apache.seatunnel.connectors.seatunnel.neo4j.sink;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.connectors.seatunnel.neo4j.config.DriverBuilder;
 import 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkQueryInfo;
-import 
org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;
-
-import org.neo4j.driver.AuthTokens;
 
 import com.google.auto.service.AutoService;
 
 import java.io.IOException;
-import java.net.URI;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_BEARER_TOKEN;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_KERBEROS_TICKET;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_MAX_CONNECTION_TIMEOUT;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_MAX_TRANSACTION_RETRY_TIME;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_NEO4J_URI;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_QUERY;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_USERNAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.PLUGIN_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.QUERY_PARAM_POSITION;
 
 @AutoService(SeaTunnelSink.class)
 public class Neo4jSink implements SeaTunnelSink<SeaTunnelRow, Void, Void, 
Void> {
 
     private SeaTunnelRowType rowType;
-    private final Neo4jSinkQueryInfo neo4JSinkQueryInfo = new 
Neo4jSinkQueryInfo();
+    private Neo4jSinkQueryInfo neo4JSinkQueryInfo;
 
     @Override
     public String getPluginName() {
@@ -65,82 +46,7 @@ public class Neo4jSink implements 
SeaTunnelSink<SeaTunnelRow, Void, Void, Void>
 
     @Override
     public void prepare(Config config) throws PrepareFailException {
-        neo4JSinkQueryInfo.setDriverBuilder(prepareDriver(config));
-
-        final CheckResult queryConfigCheck =
-                CheckConfigUtil.checkAllExists(config, KEY_QUERY.key(), 
QUERY_PARAM_POSITION.key());
-        if (!queryConfigCheck.isSuccess()) {
-            throw new Neo4jConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            PLUGIN_NAME, PluginType.SINK, 
queryConfigCheck.getMsg()));
-        }
-        neo4JSinkQueryInfo.setQuery(config.getString(KEY_QUERY.key()));
-        neo4JSinkQueryInfo.setQueryParamPosition(
-                config.getObject(QUERY_PARAM_POSITION.key()).unwrapped());
-    }
-
-    private DriverBuilder prepareDriver(Config config) {
-        final CheckResult uriConfigCheck =
-                CheckConfigUtil.checkAllExists(config, KEY_NEO4J_URI.key(), 
KEY_DATABASE.key());
-        final CheckResult authConfigCheck =
-                CheckConfigUtil.checkAtLeastOneExists(
-                        config,
-                        KEY_USERNAME.key(),
-                        KEY_BEARER_TOKEN.key(),
-                        KEY_KERBEROS_TICKET.key());
-        final CheckResult mergedConfigCheck =
-                CheckConfigUtil.mergeCheckResults(uriConfigCheck, 
authConfigCheck);
-        if (!mergedConfigCheck.isSuccess()) {
-            throw new Neo4jConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            PLUGIN_NAME, PluginType.SINK, 
mergedConfigCheck.getMsg()));
-        }
-
-        final URI uri = URI.create(config.getString(KEY_NEO4J_URI.key()));
-
-        final DriverBuilder driverBuilder = DriverBuilder.create(uri);
-
-        if (config.hasPath(KEY_USERNAME.key())) {
-            final CheckResult pwParamCheck =
-                    CheckConfigUtil.checkAllExists(config, KEY_PASSWORD.key());
-            if (!pwParamCheck.isSuccess()) {
-                throw new Neo4jConnectorException(
-                        SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                        String.format(
-                                "PluginName: %s, PluginType: %s, Message: %s",
-                                PLUGIN_NAME, PluginType.SINK, 
pwParamCheck.getMsg()));
-            }
-            final String username = config.getString(KEY_USERNAME.key());
-            final String password = config.getString(KEY_PASSWORD.key());
-
-            driverBuilder.setUsername(username);
-            driverBuilder.setPassword(password);
-        } else if (config.hasPath(KEY_BEARER_TOKEN.key())) {
-            final String bearerToken = 
config.getString(KEY_BEARER_TOKEN.key());
-            AuthTokens.bearer(bearerToken);
-            driverBuilder.setBearerToken(bearerToken);
-        } else {
-            final String kerberosTicket = 
config.getString(KEY_KERBEROS_TICKET.key());
-            AuthTokens.kerberos(kerberosTicket);
-            driverBuilder.setBearerToken(kerberosTicket);
-        }
-
-        driverBuilder.setDatabase(config.getString(KEY_DATABASE.key()));
-
-        if (config.hasPath(KEY_MAX_CONNECTION_TIMEOUT.key())) {
-            driverBuilder.setMaxConnectionTimeoutSeconds(
-                    config.getLong(KEY_MAX_CONNECTION_TIMEOUT.key()));
-        }
-        if (config.hasPath(KEY_MAX_TRANSACTION_RETRY_TIME.key())) {
-            driverBuilder.setMaxTransactionRetryTimeSeconds(
-                    config.getLong(KEY_MAX_TRANSACTION_RETRY_TIME.key()));
-        }
-
-        return driverBuilder;
+        this.neo4JSinkQueryInfo = new Neo4jSinkQueryInfo(config);
     }
 
     @Override
@@ -156,6 +62,6 @@ public class Neo4jSink implements 
SeaTunnelSink<SeaTunnelRow, Void, Void, Void>
     @Override
     public SinkWriter<SeaTunnelRow, Void, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
-        return new Neo4jSinkWriter(neo4JSinkQueryInfo);
+        return new Neo4jSinkWriter(neo4JSinkQueryInfo, rowType);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
index 528fe38a7..3cc6e82bd 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
@@ -17,22 +17,37 @@
 
 package org.apache.seatunnel.connectors.seatunnel.neo4j.sink;
 
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkQueryInfo;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.constants.CypherEnum;
+import 
org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.neo4j.internal.SeatunnelRowNeo4jValue;
 
 import org.neo4j.driver.Driver;
 import org.neo4j.driver.Query;
 import org.neo4j.driver.Session;
 import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.Value;
+import org.neo4j.driver.Values;
+import org.neo4j.driver.exceptions.ClientException;
+import org.neo4j.driver.exceptions.Neo4jException;
 
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.PLUGIN_NAME;
+
 @Slf4j
 public class Neo4jSinkWriter implements SinkWriter<SeaTunnelRow, Void, Void> {
 
@@ -40,17 +55,33 @@ public class Neo4jSinkWriter implements 
SinkWriter<SeaTunnelRow, Void, Void> {
     private final transient Driver driver;
     private final transient Session session;
 
-    public Neo4jSinkWriter(Neo4jSinkQueryInfo neo4jSinkQueryInfo) {
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final List<SeatunnelRowNeo4jValue> writeBuffer;
+    private final Integer maxBatchSize;
+
+    public Neo4jSinkWriter(
+            Neo4jSinkQueryInfo neo4jSinkQueryInfo, SeaTunnelRowType 
seaTunnelRowType) {
         this.neo4jSinkQueryInfo = neo4jSinkQueryInfo;
         this.driver = this.neo4jSinkQueryInfo.getDriverBuilder().build();
         this.session =
                 driver.session(
                         SessionConfig.forDatabase(
                                 
neo4jSinkQueryInfo.getDriverBuilder().getDatabase()));
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.maxBatchSize = 
Optional.ofNullable(neo4jSinkQueryInfo.getMaxBatchSize()).orElse(0);
+        this.writeBuffer = new ArrayList<>(maxBatchSize);
     }
 
     @Override
     public void write(SeaTunnelRow element) throws IOException {
+        if (neo4jSinkQueryInfo.batchMode()) {
+            writeByBatchSize(element);
+        } else {
+            writeOneByOne(element);
+        }
+    }
+
+    private void writeOneByOne(SeaTunnelRow element) {
         final Map<String, Object> queryParamPosition =
                 neo4jSinkQueryInfo.getQueryParamPosition().entrySet().stream()
                         .collect(
@@ -58,11 +89,47 @@ public class Neo4jSinkWriter implements 
SinkWriter<SeaTunnelRow, Void, Void> {
                                         Map.Entry::getKey,
                                         e -> element.getField((Integer) 
e.getValue())));
         final Query query = new Query(neo4jSinkQueryInfo.getQuery(), 
queryParamPosition);
-        session.writeTransaction(
-                tx -> {
-                    tx.run(query);
-                    return null;
-                });
+        writeByQuery(query);
+    }
+
+    private void writeByBatchSize(SeaTunnelRow element) {
+        writeBuffer.add(new SeatunnelRowNeo4jValue(seaTunnelRowType, element));
+        tryWriteByBatchSize();
+    }
+
+    private void tryWriteByBatchSize() {
+        if (!writeBuffer.isEmpty() && writeBuffer.size() >= maxBatchSize) {
+            Query query = batchQuery();
+            writeByQuery(query);
+            writeBuffer.clear();
+        }
+    }
+
+    private Query batchQuery() {
+        try {
+            Value batchValues = Values.parameters(CypherEnum.BATCH.getValue(), 
writeBuffer);
+            return new Query(neo4jSinkQueryInfo.getQuery(), batchValues);
+        } catch (ClientException e) {
+            log.error("Failed to build cypher statement", e);
+            throw new Neo4jConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format(
+                            "PluginName: %s, PluginType: %s, Message: %s",
+                            PLUGIN_NAME, PluginType.SINK, e.getMessage()));
+        }
+    }
+
+    private void writeByQuery(Query query) {
+        try {
+            session.writeTransaction(
+                    tx -> {
+                        tx.run(query);
+                        return null;
+                    });
+        } catch (Neo4jException e) {
+            throw new Neo4jConnectorException(
+                    Neo4jConnectorErrorCode.DATE_BASE_ERROR, e.getMessage());
+        }
     }
 
     @Override
@@ -75,7 +142,16 @@ public class Neo4jSinkWriter implements 
SinkWriter<SeaTunnelRow, Void, Void> {
 
     @Override
     public void close() throws IOException {
+        flushWriteBuffer();
         session.close();
         driver.close();
     }
+
+    private void flushWriteBuffer() {
+        if (!writeBuffer.isEmpty()) {
+            Query query = batchQuery();
+            writeByQuery(query);
+            writeBuffer.clear();
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
index 2e5f88705..12100afe2 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
@@ -34,33 +34,19 @@ import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
-import org.apache.seatunnel.connectors.seatunnel.neo4j.config.DriverBuilder;
 import 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceQueryInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;
 
-import org.neo4j.driver.AuthTokens;
-
 import com.google.auto.service.AutoService;
 
-import java.net.URI;
-
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_BEARER_TOKEN;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_KERBEROS_TICKET;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_MAX_CONNECTION_TIMEOUT;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_MAX_TRANSACTION_RETRY_TIME;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_NEO4J_URI;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_QUERY;
-import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_USERNAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.PLUGIN_NAME;
 
 @AutoService(SeaTunnelSource.class)
 public class Neo4jSource extends AbstractSingleSplitSource<SeaTunnelRow>
         implements SupportColumnProjection {
 
-    private final Neo4jSourceQueryInfo neo4jSourceQueryInfo = new 
Neo4jSourceQueryInfo();
+    private Neo4jSourceQueryInfo neo4jSourceQueryInfo;
     private SeaTunnelRowType rowType;
 
     @Override
@@ -70,11 +56,9 @@ public class Neo4jSource extends 
AbstractSingleSplitSource<SeaTunnelRow>
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
-        neo4jSourceQueryInfo.setDriverBuilder(prepareDriver(pluginConfig));
 
         final CheckResult configCheck =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig, KEY_QUERY.key(), 
CatalogTableUtil.SCHEMA.key());
+                CheckConfigUtil.checkAllExists(pluginConfig, 
CatalogTableUtil.SCHEMA.key());
 
         if (!configCheck.isSuccess()) {
             throw new Neo4jConnectorException(
@@ -85,8 +69,8 @@ public class Neo4jSource extends 
AbstractSingleSplitSource<SeaTunnelRow>
                             PluginType.SOURCE,
                             configCheck.getMsg()));
         }
-        neo4jSourceQueryInfo.setQuery(pluginConfig.getString(KEY_QUERY.key()));
 
+        this.neo4jSourceQueryInfo = new Neo4jSourceQueryInfo(pluginConfig);
         this.rowType = 
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
     }
 
@@ -105,70 +89,4 @@ public class Neo4jSource extends 
AbstractSingleSplitSource<SeaTunnelRow>
             SingleSplitReaderContext readerContext) throws Exception {
         return new Neo4jSourceReader(readerContext, neo4jSourceQueryInfo, 
rowType);
     }
-
-    private DriverBuilder prepareDriver(Config config) {
-        final CheckResult uriConfigCheck =
-                CheckConfigUtil.checkAllExists(config, KEY_NEO4J_URI.key(), 
KEY_DATABASE.key());
-        final CheckResult authConfigCheck =
-                CheckConfigUtil.checkAtLeastOneExists(
-                        config,
-                        KEY_USERNAME.key(),
-                        KEY_BEARER_TOKEN.key(),
-                        KEY_KERBEROS_TICKET.key());
-        final CheckResult mergedConfigCheck =
-                CheckConfigUtil.mergeCheckResults(uriConfigCheck, 
authConfigCheck);
-        if (!mergedConfigCheck.isSuccess()) {
-            throw new Neo4jConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            Neo4jSourceConfig.PLUGIN_NAME,
-                            PluginType.SOURCE,
-                            mergedConfigCheck.getMsg()));
-        }
-
-        final URI uri = URI.create(config.getString(KEY_NEO4J_URI.key()));
-
-        final DriverBuilder driverBuilder = DriverBuilder.create(uri);
-
-        if (config.hasPath(KEY_USERNAME.key())) {
-            final CheckResult pwParamCheck =
-                    CheckConfigUtil.checkAllExists(config, KEY_PASSWORD.key());
-            if (!pwParamCheck.isSuccess()) {
-                throw new Neo4jConnectorException(
-                        SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                        String.format(
-                                "PluginName: %s, PluginType: %s, Message: %s",
-                                Neo4jSourceConfig.PLUGIN_NAME,
-                                PluginType.SOURCE,
-                                pwParamCheck.getMsg()));
-            }
-            final String username = config.getString(KEY_USERNAME.key());
-            final String password = config.getString(KEY_PASSWORD.key());
-
-            driverBuilder.setUsername(username);
-            driverBuilder.setPassword(password);
-        } else if (config.hasPath(KEY_BEARER_TOKEN.key())) {
-            final String bearerToken = 
config.getString(KEY_BEARER_TOKEN.key());
-            AuthTokens.bearer(bearerToken);
-            driverBuilder.setBearerToken(bearerToken);
-        } else {
-            final String kerberosTicket = 
config.getString(KEY_KERBEROS_TICKET.key());
-            AuthTokens.kerberos(kerberosTicket);
-            driverBuilder.setBearerToken(kerberosTicket);
-        }
-
-        driverBuilder.setDatabase(config.getString(KEY_DATABASE.key()));
-
-        if (config.hasPath(KEY_MAX_CONNECTION_TIMEOUT.key())) {
-            driverBuilder.setMaxConnectionTimeoutSeconds(
-                    config.getLong(KEY_MAX_CONNECTION_TIMEOUT.key()));
-        }
-        if (config.hasPath(KEY_MAX_TRANSACTION_RETRY_TIME.key())) {
-            driverBuilder.setMaxTransactionRetryTimeSeconds(
-                    config.getLong(KEY_MAX_TRANSACTION_RETRY_TIME.key()));
-        }
-
-        return driverBuilder;
-    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/java/org/apache/seatunnel/e2e/connector/neo4j/Neo4jIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/java/org/apache/seatunnel/e2e/connector/neo4j/Neo4jIT.java
index a99d79c46..d70b550d4 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/java/org/apache/seatunnel/e2e/connector/neo4j/Neo4jIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/java/org/apache/seatunnel/e2e/connector/neo4j/Neo4jIT.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.TestTemplate;
 import org.neo4j.driver.AuthTokens;
 import org.neo4j.driver.Driver;
 import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Record;
 import org.neo4j.driver.Result;
 import org.neo4j.driver.Session;
 import org.neo4j.driver.SessionConfig;
@@ -54,12 +55,15 @@ import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.neo4j.driver.Values.parameters;
 
 @Slf4j
 public class Neo4jIT extends TestSuiteBase implements TestResource {
 
+    private static final int FAKE_ROW_NUM = 1000;
+
     private static final String CONTAINER_IMAGE = "neo4j:5.6.0";
     private static final String CONTAINER_HOST = "neo4j-host";
     private static final int HTTP_PORT = 7474;
@@ -153,6 +157,37 @@ public class Neo4jIT extends TestSuiteBase implements 
TestResource {
         assertEquals(Float.MAX_VALUE, tt.get("float").asFloat());
     }
 
+    @TestTemplate
+    public void testBatchWrite(TestContainer container) throws IOException, 
InterruptedException {
+        // clean test data before test
+        final Result checkExists = neo4jSession.run("MATCH (n:BatchLabel) 
RETURN n limit 1");
+        if (checkExists.hasNext()) {
+            neo4jSession.run("MATCH (n:BatchLabel) delete n");
+        }
+
+        // unwind $batch as row create(n:BatchLabel) set n.name = 
row.name,n.age = row.age
+        Container.ExecResult execResult =
+                container.executeJob("/neo4j/fake_to_neo4j_batch_write.conf");
+        // then
+        Assertions.assertEquals(0, execResult.getExitCode());
+        final Result result = neo4jSession.run("MATCH (n:BatchLabel) RETURN 
n");
+        // nodes
+        assertTrue(result.hasNext());
+        int cnt = 0;
+        // verify the attributes of the node
+        while (result.hasNext()) {
+            // don`t remove import org.neo4j.driver.Record;This can cause code 
not to compile in
+            // java14+
+            Record r = result.next();
+            String name = r.get("n").get("name").asString();
+            assertNotNull(name);
+            Object age = r.get("n").get("age").asObject();
+            assertNotNull(age);
+            cnt++;
+        }
+        assertEquals(FAKE_ROW_NUM, cnt);
+    }
+
     @AfterAll
     @Override
     public void tearDown() {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
new file mode 100644
index 000000000..e1d9fed6f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  checkpoint.interval = 5000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    result_table_name = "fake"
+    parallelism = 1
+    row.num = 1000
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Neo4j {
+    uri = "neo4j://neo4j-host:7687"
+    username = "neo4j"
+    password = "Test@12343"
+    database = "neo4j"
+    # Set it to 101 for testing code only.
+    max_batch_size = 101
+    write_mode = "BATCH"
+
+    max_transaction_retry_time = 3
+    max_connection_timeout = 1
+
+    query = "unwind $batch as row  create(n:BatchLabel) set n.name = 
row.name,n.age = row.age"
+  }
+}
\ No newline at end of file

Reply via email to