Hisoka-X commented on code in PR #5635:
URL: https://github.com/apache/seatunnel/pull/5635#discussion_r1364832952
##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java:
##########
@@ -105,127 +67,36 @@ public String getPluginName() {
@Override
public void prepare(Config config) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(config, TOPIC.key(),
BOOTSTRAP_SERVERS.key());
- if (!result.isSuccess()) {
- throw new KafkaConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
result.getMsg()));
- }
- this.metadata.setTopic(config.getString(TOPIC.key()));
- if (config.hasPath(PATTERN.key())) {
- this.metadata.setPattern(config.getBoolean(PATTERN.key()));
- } else {
- this.metadata.setPattern(PATTERN.defaultValue());
- }
-
this.metadata.setBootstrapServers(config.getString(BOOTSTRAP_SERVERS.key()));
- this.metadata.setProperties(new Properties());
-
- if (config.hasPath(CONSUMER_GROUP.key())) {
-
this.metadata.setConsumerGroup(config.getString(CONSUMER_GROUP.key()));
- } else {
- this.metadata.setConsumerGroup(CONSUMER_GROUP.defaultValue());
- }
-
- if (config.hasPath(COMMIT_ON_CHECKPOINT.key())) {
-
this.metadata.setCommitOnCheckpoint(config.getBoolean(COMMIT_ON_CHECKPOINT.key()));
- } else {
-
this.metadata.setCommitOnCheckpoint(COMMIT_ON_CHECKPOINT.defaultValue());
- }
-
- if (config.hasPath(START_MODE.key())) {
- StartMode startMode =
-
StartMode.valueOf(config.getString(START_MODE.key()).toUpperCase());
- this.metadata.setStartMode(startMode);
- switch (startMode) {
- case TIMESTAMP:
- long startOffsetsTimestamp =
config.getLong(START_MODE_TIMESTAMP.key());
- long currentTimestamp = System.currentTimeMillis();
- if (startOffsetsTimestamp < 0 || startOffsetsTimestamp >
currentTimestamp) {
- throw new IllegalArgumentException(
- "start_mode.timestamp The value is smaller
than 0 or smaller than the current time");
- }
-
this.metadata.setStartOffsetsTimestamp(startOffsetsTimestamp);
- break;
- case SPECIFIC_OFFSETS:
- Config offsets =
config.getConfig(START_MODE_OFFSETS.key());
- ConfigRenderOptions options =
ConfigRenderOptions.concise();
- String offsetsJson = offsets.root().render(options);
- if (offsetsJson == null) {
- throw new IllegalArgumentException(
- "start mode is "
- + StartMode.SPECIFIC_OFFSETS
- + "but no specific offsets were
specified.");
- }
- Map<TopicPartition, Long> specificStartOffsets = new
HashMap<>();
- ObjectNode jsonNodes = JsonUtils.parseObject(offsetsJson);
- jsonNodes
- .fieldNames()
- .forEachRemaining(
- key -> {
- int splitIndex = key.lastIndexOf("-");
- String topic = key.substring(0,
splitIndex);
- String partition =
key.substring(splitIndex + 1);
- long offset =
jsonNodes.get(key).asLong();
- TopicPartition topicPartition =
- new TopicPartition(
- topic,
Integer.valueOf(partition));
-
specificStartOffsets.put(topicPartition, offset);
- });
-
this.metadata.setSpecificStartOffsets(specificStartOffsets);
- break;
- default:
- break;
- }
- }
-
- if (config.hasPath(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key())) {
- this.discoveryIntervalMillis =
-
config.getLong(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key());
- }
-
- if (CheckConfigUtil.isValidParam(config, KAFKA_CONFIG.key())) {
- config.getObject(KAFKA_CONFIG.key())
- .forEach(
- (key, value) ->
- this.metadata.getProperties().put(key,
value.unwrapped()));
- }
-
- if (config.hasPath(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION.key())) {
- MessageFormatErrorHandleWay formatErrorWayOption =
-
ReadonlyConfig.fromConfig(config).get(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION);
- switch (formatErrorWayOption) {
- case FAIL:
- case SKIP:
- this.messageFormatErrorHandleWay = formatErrorWayOption;
- break;
- default:
- break;
- }
- }
-
- setDeserialization(config);
+ kafkaSourceConfig = new
KafkaSourceConfig(ReadonlyConfig.fromConfig(config));
}
@Override
public SeaTunnelRowType getProducedType() {
Review Comment:
This method can not be removed for now. I need to do some extra work on the
translation layer in the next PR before I can properly remove it. For this PR,
it is better to keep it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]