Hisoka-X commented on code in PR #5635:
URL: https://github.com/apache/seatunnel/pull/5635#discussion_r1364832952


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java:
##########
@@ -105,127 +67,36 @@ public String getPluginName() {
 
     @Override
     public void prepare(Config config) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(config, TOPIC.key(), 
BOOTSTRAP_SERVERS.key());
-        if (!result.isSuccess()) {
-            throw new KafkaConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-        this.metadata.setTopic(config.getString(TOPIC.key()));
-        if (config.hasPath(PATTERN.key())) {
-            this.metadata.setPattern(config.getBoolean(PATTERN.key()));
-        } else {
-            this.metadata.setPattern(PATTERN.defaultValue());
-        }
-        
this.metadata.setBootstrapServers(config.getString(BOOTSTRAP_SERVERS.key()));
-        this.metadata.setProperties(new Properties());
-
-        if (config.hasPath(CONSUMER_GROUP.key())) {
-            
this.metadata.setConsumerGroup(config.getString(CONSUMER_GROUP.key()));
-        } else {
-            this.metadata.setConsumerGroup(CONSUMER_GROUP.defaultValue());
-        }
-
-        if (config.hasPath(COMMIT_ON_CHECKPOINT.key())) {
-            
this.metadata.setCommitOnCheckpoint(config.getBoolean(COMMIT_ON_CHECKPOINT.key()));
-        } else {
-            
this.metadata.setCommitOnCheckpoint(COMMIT_ON_CHECKPOINT.defaultValue());
-        }
-
-        if (config.hasPath(START_MODE.key())) {
-            StartMode startMode =
-                    
StartMode.valueOf(config.getString(START_MODE.key()).toUpperCase());
-            this.metadata.setStartMode(startMode);
-            switch (startMode) {
-                case TIMESTAMP:
-                    long startOffsetsTimestamp = 
config.getLong(START_MODE_TIMESTAMP.key());
-                    long currentTimestamp = System.currentTimeMillis();
-                    if (startOffsetsTimestamp < 0 || startOffsetsTimestamp > 
currentTimestamp) {
-                        throw new IllegalArgumentException(
-                                "start_mode.timestamp The value is smaller 
than 0 or smaller than the current time");
-                    }
-                    
this.metadata.setStartOffsetsTimestamp(startOffsetsTimestamp);
-                    break;
-                case SPECIFIC_OFFSETS:
-                    Config offsets = 
config.getConfig(START_MODE_OFFSETS.key());
-                    ConfigRenderOptions options = 
ConfigRenderOptions.concise();
-                    String offsetsJson = offsets.root().render(options);
-                    if (offsetsJson == null) {
-                        throw new IllegalArgumentException(
-                                "start mode is "
-                                        + StartMode.SPECIFIC_OFFSETS
-                                        + "but no specific offsets were 
specified.");
-                    }
-                    Map<TopicPartition, Long> specificStartOffsets = new 
HashMap<>();
-                    ObjectNode jsonNodes = JsonUtils.parseObject(offsetsJson);
-                    jsonNodes
-                            .fieldNames()
-                            .forEachRemaining(
-                                    key -> {
-                                        int splitIndex = key.lastIndexOf("-");
-                                        String topic = key.substring(0, 
splitIndex);
-                                        String partition = 
key.substring(splitIndex + 1);
-                                        long offset = 
jsonNodes.get(key).asLong();
-                                        TopicPartition topicPartition =
-                                                new TopicPartition(
-                                                        topic, 
Integer.valueOf(partition));
-                                        
specificStartOffsets.put(topicPartition, offset);
-                                    });
-                    
this.metadata.setSpecificStartOffsets(specificStartOffsets);
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        if (config.hasPath(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key())) {
-            this.discoveryIntervalMillis =
-                    
config.getLong(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key());
-        }
-
-        if (CheckConfigUtil.isValidParam(config, KAFKA_CONFIG.key())) {
-            config.getObject(KAFKA_CONFIG.key())
-                    .forEach(
-                            (key, value) ->
-                                    this.metadata.getProperties().put(key, 
value.unwrapped()));
-        }
-
-        if (config.hasPath(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION.key())) {
-            MessageFormatErrorHandleWay formatErrorWayOption =
-                    
ReadonlyConfig.fromConfig(config).get(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION);
-            switch (formatErrorWayOption) {
-                case FAIL:
-                case SKIP:
-                    this.messageFormatErrorHandleWay = formatErrorWayOption;
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        setDeserialization(config);
+        kafkaSourceConfig = new 
KafkaSourceConfig(ReadonlyConfig.fromConfig(config));
     }
 
     @Override
     public SeaTunnelRowType getProducedType() {

Review Comment:
   This method can not be removed for now. I need to do some extra work on the 
translation layer in the next PR before I can properly remove it. For this PR, 
it is better to keep it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to