FuYouJ commented on code in PR #4841:
URL: https://github.com/apache/seatunnel/pull/4841#discussion_r1224262358
##########
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java:
##########
@@ -65,22 +68,78 @@ public String getPluginName() {
@Override
public void prepare(Config config) throws PrepareFailException {
- neo4JSinkQueryInfo.setDriverBuilder(prepareDriver(config));
- final CheckResult queryConfigCheck =
+ // check username password query and init driver
+ DriverBuilder driverBuilder = prepareDriver(config);
+ neo4JSinkQueryInfo.setDriverBuilder(driverBuilder);
+ setNeo4jWriteMode(config);
+
+ if (neo4JSinkQueryInfo.batchMode()) {
+ prepareBatchModeConfigParams(config);
+ } else {
+ prepareWriteOneByOneConfigParams(config);
+ }
+ }
+
+ private void setNeo4jWriteMode(Config config) {
+ if (config.hasPath(MAX_BATCH_SIZE.key())) {
+ int batchSize = config.getInt(MAX_BATCH_SIZE.key());
+ if (batchSize <= 0) {
+ throw new Neo4jConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ PLUGIN_NAME, PluginType.SINK, "maxBatchSize
must greater than 0"));
+ }
+ neo4JSinkQueryInfo.setMaxBatchSize(batchSize);
+ neo4JSinkQueryInfo.setWriteMode(SinkWriteMode.Batch);
+ } else {
+ neo4JSinkQueryInfo.setWriteMode(SinkWriteMode.OneByOne);
+ }
+ }
+
+ private void prepareWriteOneByOneConfigParams(Config config) {
+
+ CheckResult queryConfigCheck =
CheckConfigUtil.checkAllExists(config, KEY_QUERY.key(),
QUERY_PARAM_POSITION.key());
+
if (!queryConfigCheck.isSuccess()) {
throw new Neo4jConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
PLUGIN_NAME, PluginType.SINK,
queryConfigCheck.getMsg()));
}
+ // set query
neo4JSinkQueryInfo.setQuery(config.getString(KEY_QUERY.key()));
+ // set queryParamPosition
neo4JSinkQueryInfo.setQueryParamPosition(
config.getObject(QUERY_PARAM_POSITION.key()).unwrapped());
}
+ private void prepareBatchModeConfigParams(Config config) {
+
+ CheckResult queryConfigCheck = CheckConfigUtil.checkAllExists(config,
KEY_QUERY.key());
+ if (!queryConfigCheck.isSuccess()) {
+ throw new Neo4jConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ PLUGIN_NAME, PluginType.SINK,
queryConfigCheck.getMsg()));
+ }
+
+ int batchSize = config.getInt(MAX_BATCH_SIZE.key());
+ neo4JSinkQueryInfo.setMaxBatchSize(batchSize);
Review Comment:
> This code snippets can be extracted into neo4JSinkQueryInfo, for more
details you can refer to HttpParamters
iI've refactored the code,The code is cleaner and easier to read, and the
documentation has been updated。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]