hailin0 commented on code in PR #5635:
URL: https://github.com/apache/seatunnel/pull/5635#discussion_r1364805869


##########
seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java:
##########
@@ -66,6 +73,10 @@ public <T> T get(Option<T> option) {
         return getOptional(option).orElseGet(option::defaultValue);
     }
 
+    public <T> T get(Option<T> option, T defaultValue) {

Review Comment:
   Why not use the following:
   
   1. Define default value in Option
   
   2. getOptional(Option).orElse(defaultValue)



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java:
##########
@@ -105,127 +67,36 @@ public String getPluginName() {
 
     @Override
     public void prepare(Config config) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(config, TOPIC.key(), 
BOOTSTRAP_SERVERS.key());
-        if (!result.isSuccess()) {
-            throw new KafkaConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-        this.metadata.setTopic(config.getString(TOPIC.key()));
-        if (config.hasPath(PATTERN.key())) {
-            this.metadata.setPattern(config.getBoolean(PATTERN.key()));
-        } else {
-            this.metadata.setPattern(PATTERN.defaultValue());
-        }
-        
this.metadata.setBootstrapServers(config.getString(BOOTSTRAP_SERVERS.key()));
-        this.metadata.setProperties(new Properties());
-
-        if (config.hasPath(CONSUMER_GROUP.key())) {
-            
this.metadata.setConsumerGroup(config.getString(CONSUMER_GROUP.key()));
-        } else {
-            this.metadata.setConsumerGroup(CONSUMER_GROUP.defaultValue());
-        }
-
-        if (config.hasPath(COMMIT_ON_CHECKPOINT.key())) {
-            
this.metadata.setCommitOnCheckpoint(config.getBoolean(COMMIT_ON_CHECKPOINT.key()));
-        } else {
-            
this.metadata.setCommitOnCheckpoint(COMMIT_ON_CHECKPOINT.defaultValue());
-        }
-
-        if (config.hasPath(START_MODE.key())) {
-            StartMode startMode =
-                    
StartMode.valueOf(config.getString(START_MODE.key()).toUpperCase());
-            this.metadata.setStartMode(startMode);
-            switch (startMode) {
-                case TIMESTAMP:
-                    long startOffsetsTimestamp = 
config.getLong(START_MODE_TIMESTAMP.key());
-                    long currentTimestamp = System.currentTimeMillis();
-                    if (startOffsetsTimestamp < 0 || startOffsetsTimestamp > 
currentTimestamp) {
-                        throw new IllegalArgumentException(
-                                "start_mode.timestamp The value is smaller 
than 0 or smaller than the current time");
-                    }
-                    
this.metadata.setStartOffsetsTimestamp(startOffsetsTimestamp);
-                    break;
-                case SPECIFIC_OFFSETS:
-                    Config offsets = 
config.getConfig(START_MODE_OFFSETS.key());
-                    ConfigRenderOptions options = 
ConfigRenderOptions.concise();
-                    String offsetsJson = offsets.root().render(options);
-                    if (offsetsJson == null) {
-                        throw new IllegalArgumentException(
-                                "start mode is "
-                                        + StartMode.SPECIFIC_OFFSETS
-                                        + "but no specific offsets were 
specified.");
-                    }
-                    Map<TopicPartition, Long> specificStartOffsets = new 
HashMap<>();
-                    ObjectNode jsonNodes = JsonUtils.parseObject(offsetsJson);
-                    jsonNodes
-                            .fieldNames()
-                            .forEachRemaining(
-                                    key -> {
-                                        int splitIndex = key.lastIndexOf("-");
-                                        String topic = key.substring(0, 
splitIndex);
-                                        String partition = 
key.substring(splitIndex + 1);
-                                        long offset = 
jsonNodes.get(key).asLong();
-                                        TopicPartition topicPartition =
-                                                new TopicPartition(
-                                                        topic, 
Integer.valueOf(partition));
-                                        
specificStartOffsets.put(topicPartition, offset);
-                                    });
-                    
this.metadata.setSpecificStartOffsets(specificStartOffsets);
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        if (config.hasPath(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key())) {
-            this.discoveryIntervalMillis =
-                    
config.getLong(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key());
-        }
-
-        if (CheckConfigUtil.isValidParam(config, KAFKA_CONFIG.key())) {
-            config.getObject(KAFKA_CONFIG.key())
-                    .forEach(
-                            (key, value) ->
-                                    this.metadata.getProperties().put(key, 
value.unwrapped()));
-        }
-
-        if (config.hasPath(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION.key())) {
-            MessageFormatErrorHandleWay formatErrorWayOption =
-                    
ReadonlyConfig.fromConfig(config).get(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION);
-            switch (formatErrorWayOption) {
-                case FAIL:
-                case SKIP:
-                    this.messageFormatErrorHandleWay = formatErrorWayOption;
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        setDeserialization(config);
+        kafkaSourceConfig = new 
KafkaSourceConfig(ReadonlyConfig.fromConfig(config));
     }
 
     @Override
     public SeaTunnelRowType getProducedType() {

Review Comment:
   remove this method ?
   
   https://github.com/apache/seatunnel/pull/5625



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java:
##########
@@ -105,127 +67,36 @@ public String getPluginName() {
 
     @Override
     public void prepare(Config config) throws PrepareFailException {

Review Comment:
   remove this method ?
   
   https://github.com/apache/seatunnel/pull/5625



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java:
##########
@@ -105,127 +67,36 @@ public String getPluginName() {
 
     @Override
     public void prepare(Config config) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(config, TOPIC.key(), 
BOOTSTRAP_SERVERS.key());
-        if (!result.isSuccess()) {
-            throw new KafkaConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-        this.metadata.setTopic(config.getString(TOPIC.key()));
-        if (config.hasPath(PATTERN.key())) {
-            this.metadata.setPattern(config.getBoolean(PATTERN.key()));
-        } else {
-            this.metadata.setPattern(PATTERN.defaultValue());
-        }
-        
this.metadata.setBootstrapServers(config.getString(BOOTSTRAP_SERVERS.key()));
-        this.metadata.setProperties(new Properties());
-
-        if (config.hasPath(CONSUMER_GROUP.key())) {
-            
this.metadata.setConsumerGroup(config.getString(CONSUMER_GROUP.key()));
-        } else {
-            this.metadata.setConsumerGroup(CONSUMER_GROUP.defaultValue());
-        }
-
-        if (config.hasPath(COMMIT_ON_CHECKPOINT.key())) {
-            
this.metadata.setCommitOnCheckpoint(config.getBoolean(COMMIT_ON_CHECKPOINT.key()));
-        } else {
-            
this.metadata.setCommitOnCheckpoint(COMMIT_ON_CHECKPOINT.defaultValue());
-        }
-
-        if (config.hasPath(START_MODE.key())) {
-            StartMode startMode =
-                    
StartMode.valueOf(config.getString(START_MODE.key()).toUpperCase());
-            this.metadata.setStartMode(startMode);
-            switch (startMode) {
-                case TIMESTAMP:
-                    long startOffsetsTimestamp = 
config.getLong(START_MODE_TIMESTAMP.key());
-                    long currentTimestamp = System.currentTimeMillis();
-                    if (startOffsetsTimestamp < 0 || startOffsetsTimestamp > 
currentTimestamp) {
-                        throw new IllegalArgumentException(
-                                "start_mode.timestamp The value is smaller 
than 0 or smaller than the current time");
-                    }
-                    
this.metadata.setStartOffsetsTimestamp(startOffsetsTimestamp);
-                    break;
-                case SPECIFIC_OFFSETS:
-                    Config offsets = 
config.getConfig(START_MODE_OFFSETS.key());
-                    ConfigRenderOptions options = 
ConfigRenderOptions.concise();
-                    String offsetsJson = offsets.root().render(options);
-                    if (offsetsJson == null) {
-                        throw new IllegalArgumentException(
-                                "start mode is "
-                                        + StartMode.SPECIFIC_OFFSETS
-                                        + "but no specific offsets were 
specified.");
-                    }
-                    Map<TopicPartition, Long> specificStartOffsets = new 
HashMap<>();
-                    ObjectNode jsonNodes = JsonUtils.parseObject(offsetsJson);
-                    jsonNodes
-                            .fieldNames()
-                            .forEachRemaining(
-                                    key -> {
-                                        int splitIndex = key.lastIndexOf("-");
-                                        String topic = key.substring(0, 
splitIndex);
-                                        String partition = 
key.substring(splitIndex + 1);
-                                        long offset = 
jsonNodes.get(key).asLong();
-                                        TopicPartition topicPartition =
-                                                new TopicPartition(
-                                                        topic, 
Integer.valueOf(partition));
-                                        
specificStartOffsets.put(topicPartition, offset);
-                                    });
-                    
this.metadata.setSpecificStartOffsets(specificStartOffsets);
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        if (config.hasPath(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key())) {
-            this.discoveryIntervalMillis =
-                    
config.getLong(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key());
-        }
-
-        if (CheckConfigUtil.isValidParam(config, KAFKA_CONFIG.key())) {
-            config.getObject(KAFKA_CONFIG.key())
-                    .forEach(
-                            (key, value) ->
-                                    this.metadata.getProperties().put(key, 
value.unwrapped()));
-        }
-
-        if (config.hasPath(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION.key())) {
-            MessageFormatErrorHandleWay formatErrorWayOption =
-                    
ReadonlyConfig.fromConfig(config).get(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION);
-            switch (formatErrorWayOption) {
-                case FAIL:
-                case SKIP:
-                    this.messageFormatErrorHandleWay = formatErrorWayOption;
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        setDeserialization(config);
+        kafkaSourceConfig = new 
KafkaSourceConfig(ReadonlyConfig.fromConfig(config));
     }
 
     @Override
     public SeaTunnelRowType getProducedType() {

Review Comment:
   remove this method ?
   
   https://github.com/apache/seatunnel/pull/5625



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java:
##########
@@ -105,127 +67,36 @@ public String getPluginName() {
 
     @Override
     public void prepare(Config config) throws PrepareFailException {

Review Comment:
   remove this method ?
   
   https://github.com/apache/seatunnel/pull/5625



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to