TyrantLucifer commented on code in PR #4841:
URL: https://github.com/apache/seatunnel/pull/4841#discussion_r1225303367


##########
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java:
##########
@@ -17,52 +17,119 @@
 
 package org.apache.seatunnel.connectors.seatunnel.neo4j.sink;
 
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkQueryInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.neo4j.internal.SeatunnelRowNeo4jValue;
 
 import org.neo4j.driver.Driver;
 import org.neo4j.driver.Query;
 import org.neo4j.driver.Session;
 import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.Value;
+import org.neo4j.driver.Values;
+import org.neo4j.driver.exceptions.ClientException;
+import org.neo4j.driver.exceptions.Neo4jException;
 
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.PLUGIN_NAME;
+
 @Slf4j
 public class Neo4jSinkWriter implements SinkWriter<SeaTunnelRow, Void, Void> {
 
     private final Neo4jSinkQueryInfo neo4jSinkQueryInfo;
     private final transient Driver driver;
     private final transient Session session;
 
-    public Neo4jSinkWriter(Neo4jSinkQueryInfo neo4jSinkQueryInfo) {
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final List<SeatunnelRowNeo4jValue> writeBuffer;
+    private final Integer maxBatchSize;
+
+    public Neo4jSinkWriter(
+            Neo4jSinkQueryInfo neo4jSinkQueryInfo, SeaTunnelRowType 
seaTunnelRowType) {
         this.neo4jSinkQueryInfo = neo4jSinkQueryInfo;
         this.driver = this.neo4jSinkQueryInfo.getDriverBuilder().build();
         this.session =
                 driver.session(
                         SessionConfig.forDatabase(
                                 
neo4jSinkQueryInfo.getDriverBuilder().getDatabase()));
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.maxBatchSize = 
Optional.ofNullable(neo4jSinkQueryInfo.getMaxBatchSize()).orElse(0);
+        this.writeBuffer = new ArrayList<>(maxBatchSize);
     }
 
     @Override
     public void write(SeaTunnelRow element) throws IOException {
+        if (neo4jSinkQueryInfo.batchMode()) {
+            writeByBatchSize(element);
+        } else {
+            writeOneByOne(element);
+        }
+    }
+
+    private void writeOneByOne(SeaTunnelRow element) {
         final Map<String, Object> queryParamPosition =
                 neo4jSinkQueryInfo.getQueryParamPosition().entrySet().stream()
                         .collect(
                                 Collectors.toMap(
                                         Map.Entry::getKey,
                                         e -> element.getField((Integer) 
e.getValue())));
         final Query query = new Query(neo4jSinkQueryInfo.getQuery(), 
queryParamPosition);
-        session.writeTransaction(
-                tx -> {
-                    tx.run(query);
-                    return null;
-                });
+        writeByQuery(query);
+    }
+
+    private void writeByBatchSize(SeaTunnelRow element) {
+        writeBuffer.add(new SeatunnelRowNeo4jValue(seaTunnelRowType, element));
+        tryWriteByBatchSize();
+    }
+
+    private void tryWriteByBatchSize() {
+        if (!writeBuffer.isEmpty() && writeBuffer.size() >= maxBatchSize) {
+            Query query = batchQuery();
+            writeByQuery(query);
+            writeBuffer.clear();
+        }
+    }
+
+    private Query batchQuery() {
+        try {
+            Value batchValues =
+                    
Values.parameters(neo4jSinkQueryInfo.getBatchDataVariable(), writeBuffer);
+            return new Query(neo4jSinkQueryInfo.getQuery(), batchValues);
+        } catch (ClientException e) {
+            log.error("Failed to build cypher statement", e);
+            throw new Neo4jConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,

Review Comment:
   Why throw config validation failed? I think there could be has some problems 
with neo4j server



##########
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java:
##########
@@ -29,4 +29,17 @@ public class Neo4jSinkConfig extends Neo4jCommonConfig {
                     .noDefaultValue()
                     .withDescription(
                             "position mapping information for query 
parameters. key name is parameter placeholder name. associated value is 
position of field in input data row.");
+
+    public static final Option<Integer> MAX_BATCH_SIZE =
+            Options.key("maxBatchSize")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("neo4j write max batchSize");
+
+    public static final Option<String> BATCH_VARIABLE =
+            Options.key("batchVariable")
+                    .stringType()

Review Comment:
   Remove?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to